diff --git a/sharp_frames/processing/colorspace.py b/sharp_frames/processing/colorspace.py new file mode 100644 index 0000000..b0b1b1a --- /dev/null +++ b/sharp_frames/processing/colorspace.py @@ -0,0 +1,349 @@ +"""Color space detection and FFmpeg filter generation for proper color conversion. + +This module handles detection of video color spaces (especially iPhone HDR/wide gamut) +and generates appropriate FFmpeg filter chains to convert to sRGB/BT.709. +""" + +from dataclasses import dataclass +from typing import Optional, Dict, Any +from enum import Enum +import subprocess +import json +import os + + +__all__ = [ + 'VideoColorInfo', + 'ColorPrimaries', + 'TransferFunction', + 'ColorMatrix', + 'detect_color_space', + 'parse_color_info_from_stream', + 'build_colorspace_filter', + 'get_color_info_description', + 'is_zscale_available', +] + + +class ColorPrimaries(Enum): + """Video color primaries (ITU-T H.273).""" + BT709 = "bt709" # sRGB/Rec.709 + BT2020 = "bt2020" # Wide gamut (HDR) + DISPLAY_P3 = "smpte432" # Display P3 (Apple) + UNKNOWN = "unknown" + + +class TransferFunction(Enum): + """Transfer characteristics (gamma/EOTF).""" + BT709 = "bt709" # SDR + SRGB = "iec61966-2-1" # sRGB + PQ = "smpte2084" # HDR10/Dolby Vision + HLG = "arib-std-b67" # Hybrid Log-Gamma + UNKNOWN = "unknown" + + +class ColorMatrix(Enum): + """Color matrix coefficients.""" + BT709 = "bt709" + BT2020_NCL = "bt2020nc" # Non-constant luminance + BT2020_CL = "bt2020c" # Constant luminance + UNKNOWN = "unknown" + + +@dataclass +class VideoColorInfo: + """Detected color space information from video.""" + color_primaries: ColorPrimaries + transfer_function: TransferFunction + color_matrix: ColorMatrix + is_hdr: bool + max_luminance: Optional[float] = None + + @property + def needs_conversion(self) -> bool: + """Check if video needs color space conversion to sRGB/BT.709.""" + # Convert if not standard BT.709 SDR + if self.is_hdr: + return True + if self.color_primaries not in [ColorPrimaries.BT709, ColorPrimaries.UNKNOWN]: + return True + if self.transfer_function not in [TransferFunction.BT709, TransferFunction.SRGB, TransferFunction.UNKNOWN]: + return True + return False + + @property + def is_wide_gamut_sdr(self) -> bool: + """Check if this is wide gamut SDR (e.g., Display P3 without HDR).""" + return ( + self.color_primaries in [ColorPrimaries.DISPLAY_P3, ColorPrimaries.BT2020] and + not self.is_hdr + ) + + +def check_zscale_available() -> bool: + """Check if FFmpeg has zscale filter available (requires libzimg).""" + ffmpeg_executable = 'ffmpeg.exe' if os.name == 'nt' else 'ffmpeg' + try: + result = subprocess.run( + [ffmpeg_executable, '-filters'], + capture_output=True, + text=True, + timeout=10 + ) + return 'zscale' in result.stdout + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + return False + + +# Cache the zscale availability check +_zscale_available: Optional[bool] = None + + +def is_zscale_available() -> bool: + """Check if zscale is available (cached).""" + global _zscale_available + if _zscale_available is None: + _zscale_available = check_zscale_available() + return _zscale_available + + +def detect_color_space(video_path: str) -> VideoColorInfo: + """ + Detect video color space using ffprobe. + + Args: + video_path: Path to the video file + + Returns: + VideoColorInfo with detected or assumed defaults + """ + ffprobe_executable = 'ffprobe.exe' if os.name == 'nt' else 'ffprobe' + + cmd = [ + ffprobe_executable, '-v', 'quiet', + '-print_format', 'json', + '-show_streams', '-select_streams', 'v:0', + os.path.normpath(video_path) + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + return _default_color_info() + + data = json.loads(result.stdout) + streams = data.get('streams', []) + if not streams: + return _default_color_info() + + stream = streams[0] + return parse_color_info_from_stream(stream) + + except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError, OSError): + return _default_color_info() + + +def parse_color_info_from_stream(stream: Dict[str, Any]) -> VideoColorInfo: + """Parse ffprobe stream data into VideoColorInfo. + + This can be used to extract color info from existing ffprobe data + without making an additional subprocess call. + + Args: + stream: A video stream dict from ffprobe JSON output + + Returns: + VideoColorInfo with parsed color space information + """ + # Extract color metadata + color_primaries_str = stream.get('color_primaries', 'unknown') + transfer_str = stream.get('color_transfer', 'unknown') + color_matrix_str = stream.get('color_space', 'unknown') + + # Map to enums + primaries = _map_color_primaries(color_primaries_str) + transfer = _map_transfer_function(transfer_str) + matrix = _map_color_matrix(color_matrix_str) + + # Determine if HDR based on transfer function + is_hdr = transfer in [TransferFunction.PQ, TransferFunction.HLG] + + # Try to get max luminance from side data (for HDR content) + max_luminance = None + side_data = stream.get('side_data_list', []) + for data in side_data: + if data.get('side_data_type') == 'Mastering display metadata': + max_lum_str = data.get('max_luminance', '') + if '/' in str(max_lum_str): + try: + num, denom = str(max_lum_str).split('/') + max_luminance = float(num) / float(denom) + except (ValueError, ZeroDivisionError): + pass + + return VideoColorInfo( + color_primaries=primaries, + transfer_function=transfer, + color_matrix=matrix, + is_hdr=is_hdr, + max_luminance=max_luminance + ) + + +def _map_color_primaries(value: str) -> ColorPrimaries: + """Map ffprobe color_primaries to enum.""" + if not value: + return ColorPrimaries.UNKNOWN + mapping = { + 'bt709': ColorPrimaries.BT709, + 'bt2020': ColorPrimaries.BT2020, + 'smpte432': ColorPrimaries.DISPLAY_P3, + } + return mapping.get(value.lower(), ColorPrimaries.UNKNOWN) + + +def _map_transfer_function(value: str) -> TransferFunction: + """Map ffprobe color_transfer to enum.""" + if not value: + return TransferFunction.UNKNOWN + mapping = { + 'bt709': TransferFunction.BT709, + 'iec61966-2-1': TransferFunction.SRGB, + 'smpte2084': TransferFunction.PQ, + 'arib-std-b67': TransferFunction.HLG, + } + return mapping.get(value.lower(), TransferFunction.UNKNOWN) + + +def _map_color_matrix(value: str) -> ColorMatrix: + """Map ffprobe color_space (matrix) to enum.""" + if not value: + return ColorMatrix.UNKNOWN + mapping = { + 'bt709': ColorMatrix.BT709, + 'bt2020nc': ColorMatrix.BT2020_NCL, + 'bt2020c': ColorMatrix.BT2020_CL, + } + return mapping.get(value.lower(), ColorMatrix.UNKNOWN) + + +def _default_color_info() -> VideoColorInfo: + """Return default color info (assume unknown - no conversion).""" + return VideoColorInfo( + color_primaries=ColorPrimaries.UNKNOWN, + transfer_function=TransferFunction.UNKNOWN, + color_matrix=ColorMatrix.UNKNOWN, + is_hdr=False + ) + + +def build_colorspace_filter(color_info: VideoColorInfo) -> Optional[str]: + """ + Build FFmpeg filter string for color space conversion to sRGB/BT.709. + + Args: + color_info: Detected color space information + + Returns: + Filter string to add to -vf, or None if no conversion needed + """ + if not color_info.needs_conversion: + return None + + if color_info.is_hdr: + return _build_hdr_to_sdr_filter(color_info) + elif color_info.is_wide_gamut_sdr: + return _build_wide_gamut_to_srgb_filter(color_info) + else: + return None + + +def _build_hdr_to_sdr_filter(color_info: VideoColorInfo) -> str: + """ + Build HDR to SDR tone mapping filter. + + Uses zscale for high-quality conversion if available, + otherwise falls back to basic colorspace filter. + """ + # Determine input transfer function for zscale + if color_info.transfer_function == TransferFunction.PQ: + transfer_in = "smpte2084" + elif color_info.transfer_function == TransferFunction.HLG: + transfer_in = "arib-std-b67" + else: + transfer_in = "smpte2084" # Default to PQ for unknown HDR + + # Determine input primaries + if color_info.color_primaries == ColorPrimaries.BT2020: + primaries_in = "bt2020" + elif color_info.color_primaries == ColorPrimaries.DISPLAY_P3: + primaries_in = "smpte432" + else: + primaries_in = "bt2020" # Default to BT.2020 for HDR + + # Determine input matrix + if color_info.color_matrix in [ColorMatrix.BT2020_NCL, ColorMatrix.BT2020_CL]: + matrix_in = "bt2020nc" + else: + matrix_in = "bt2020nc" # Default for HDR + + if is_zscale_available(): + # Full HDR to SDR pipeline with tone mapping + # Explicitly specify input parameters for reliable conversion + filter_chain = ( + f"zscale=tin={transfer_in}:min={matrix_in}:pin={primaries_in}:" + f"t=linear:npl=100," # Linearize with input specs + "format=gbrpf32le," # High precision intermediate + "zscale=p=bt709," # Convert primaries to BT.709 + "tonemap=hable:desat=0," # Hable tone mapping (filmic) + "zscale=t=bt709:m=bt709:r=tv," # Apply BT.709 transfer/matrix + "format=yuv420p" # Standard output format + ) + else: + # Fallback: basic colorspace conversion (won't tone map properly) + # This is better than nothing but may clip highlights + filter_chain = f"colorspace=all=bt709:iall={primaries_in}:fast=0" + + return filter_chain + + +def _build_wide_gamut_to_srgb_filter(color_info: VideoColorInfo) -> str: + """ + Build wide gamut SDR (Display P3/BT.2020) to sRGB/BT.709 filter. + """ + # Determine input color space for the filter + if color_info.color_primaries == ColorPrimaries.DISPLAY_P3: + input_primaries = "smpte432" + elif color_info.color_primaries == ColorPrimaries.BT2020: + input_primaries = "bt2020" + else: + input_primaries = "bt709" # Assume BT.709 if unknown + + # Use colorspace filter for gamut conversion (no tone mapping needed) + filter_chain = f"colorspace=all=bt709:iall={input_primaries}:fast=0" + + return filter_chain + + +def get_color_info_description(color_info: VideoColorInfo) -> str: + """Get a human-readable description of the color space.""" + parts = [] + + if color_info.is_hdr: + if color_info.transfer_function == TransferFunction.PQ: + parts.append("HDR10/Dolby Vision (PQ)") + elif color_info.transfer_function == TransferFunction.HLG: + parts.append("HLG HDR") + else: + parts.append("HDR") + else: + parts.append("SDR") + + if color_info.color_primaries == ColorPrimaries.DISPLAY_P3: + parts.append("Display P3") + elif color_info.color_primaries == ColorPrimaries.BT2020: + parts.append("BT.2020") + elif color_info.color_primaries == ColorPrimaries.BT709: + parts.append("BT.709") + + return " / ".join(parts) if parts else "Unknown" diff --git a/sharp_frames/processing/frame_extractor.py b/sharp_frames/processing/frame_extractor.py index 3e82dd7..ac28bf8 100644 --- a/sharp_frames/processing/frame_extractor.py +++ b/sharp_frames/processing/frame_extractor.py @@ -6,12 +6,15 @@ import tempfile import subprocess import shutil -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, TYPE_CHECKING from pathlib import Path from ..models.frame_data import FrameData, ExtractionResult from ..video_utils import get_video_files_in_directory +if TYPE_CHECKING: + from .colorspace import VideoColorInfo + class FrameExtractor: """Handles frame extraction from videos and loading from directories.""" @@ -67,21 +70,30 @@ def _load_images(self, image_directory: str) -> ExtractionResult: def _extract_video_frames(self, config: Dict[str, Any]) -> ExtractionResult: """Extract frames from single video file.""" + from .colorspace import get_color_info_description + video_path = config['input_path'] fps = config.get('fps', 10) output_format = config.get('output_format', 'jpg') width = config.get('width', 0) - + # Create temporary directory for extraction temp_dir = self._create_temp_directory() - + try: - # Extract video info + # Extract video info (includes color space data) video_info = self._get_video_info(video_path) duration = self._extract_duration_from_info(video_info) - - # Perform FFmpeg extraction - if not self._run_ffmpeg_extraction(video_path, temp_dir, fps, output_format, width, duration): + + # Extract color space from existing video_info (no extra ffprobe call) + color_info = self._extract_color_info_from_video_info(video_info) + if color_info.needs_conversion: + color_desc = get_color_info_description(color_info) + if self.progress_callback: + self.progress_callback("extraction", 0, 0, f"Detected {color_desc}, will convert to sRGB") + + # Perform FFmpeg extraction with color space handling + if not self._run_ffmpeg_extraction(video_path, temp_dir, fps, output_format, width, duration, color_info): raise RuntimeError("Frame extraction failed") # Get extracted frame paths @@ -192,17 +204,20 @@ def _extract_single_video(self, video_path: str, video_index: int, temp_dir: str video_name = f"video_{video_index + 1:03d}" video_temp_dir = os.path.join(temp_dir, video_name) os.makedirs(video_temp_dir, exist_ok=True) - + fps = config.get('fps', 10) output_format = config.get('output_format', 'jpg') width = config.get('width', 0) - - # Get video info + + # Get video info (includes color space data) video_info = self._get_video_info(video_path) duration = self._extract_duration_from_info(video_info) - - # Extract frames - if not self._run_ffmpeg_extraction(video_path, video_temp_dir, fps, output_format, width, duration): + + # Extract color space from existing video_info (no extra ffprobe call) + color_info = self._extract_color_info_from_video_info(video_info) + + # Extract frames with color space handling + if not self._run_ffmpeg_extraction(video_path, video_temp_dir, fps, output_format, width, duration, color_info): raise RuntimeError(f"Failed to extract frames from {video_path}") # Get extracted frame files @@ -301,18 +316,50 @@ def _extract_duration_from_info(self, video_info: Dict[str, Any]) -> Optional[fl return float(duration_str) if duration_str else None except (ValueError, TypeError): return None + + def _extract_color_info_from_video_info(self, video_info: Dict[str, Any]) -> 'VideoColorInfo': + """Extract color space info from existing video_info (avoids extra ffprobe call).""" + from .colorspace import parse_color_info_from_stream, VideoColorInfo, ColorPrimaries, TransferFunction, ColorMatrix + + streams = video_info.get('streams', []) + # Find the first video stream + for stream in streams: + if stream.get('codec_type') == 'video': + return parse_color_info_from_stream(stream) + + # No video stream found, return default + return VideoColorInfo( + color_primaries=ColorPrimaries.UNKNOWN, + transfer_function=TransferFunction.UNKNOWN, + color_matrix=ColorMatrix.UNKNOWN, + is_hdr=False + ) - def _run_ffmpeg_extraction(self, video_path: str, output_dir: str, fps: int, - output_format: str, width: int, duration: Optional[float] = None) -> bool: - """Run FFmpeg to extract frames from video with progress monitoring.""" + def _run_ffmpeg_extraction(self, video_path: str, output_dir: str, fps: int, + output_format: str, width: int, duration: Optional[float] = None, + color_info: Optional['VideoColorInfo'] = None) -> bool: + """Run FFmpeg to extract frames from video with progress monitoring and color space conversion.""" + from .colorspace import build_colorspace_filter + output_pattern = os.path.join(output_dir, f"frame_%05d.{output_format}") - - # Build video filters - vf_filters = [f"fps={fps}"] + + # Build video filters - order matters! + vf_filters = [] + + # 1. Color space conversion FIRST (before any scaling) + if color_info is not None: + colorspace_filter = build_colorspace_filter(color_info) + if colorspace_filter: + vf_filters.append(colorspace_filter) + + # 2. FPS filter + vf_filters.append(f"fps={fps}") + + # 3. Scale filter (after color conversion) if width > 0: # Use lanczos scaling for high-quality downsampling vf_filters.append(f"scale={width}:-1:flags=lanczos") - + vf_string = ",".join(vf_filters) # Use proper executable name based on platform diff --git a/sharp_frames/processing/minimal_progress.py b/sharp_frames/processing/minimal_progress.py index daf4243..f28bbfb 100644 --- a/sharp_frames/processing/minimal_progress.py +++ b/sharp_frames/processing/minimal_progress.py @@ -79,19 +79,29 @@ def _check_dependencies(self, check_ffmpeg: bool = True) -> bool: return True - def _build_ffmpeg_command(self, output_pattern: str) -> List[str]: - """Build the FFmpeg command for frame extraction.""" - # Build the video filters string + def _build_ffmpeg_command(self, output_pattern: str, color_info=None) -> List[str]: + """Build the FFmpeg command for frame extraction with color space conversion.""" + from .colorspace import build_colorspace_filter + + # Build the video filters string - order matters! vf_filters = [] + + # 1. Color space conversion FIRST (before any other processing) + if color_info is not None: + colorspace_filter = build_colorspace_filter(color_info) + if colorspace_filter: + vf_filters.append(colorspace_filter) + + # 2. FPS filter vf_filters.append(f"fps={self.fps}") - - # Add scaling filter if width is specified + + # 3. Scaling filter (after color conversion) if self.width > 0: vf_filters.append(f"scale={self.width}:-2") # -2 maintains aspect ratio and ensures even height - + # Join all filters with commas vf_string = ",".join(vf_filters) - + command = [ "ffmpeg", "-i", self.input_path, @@ -102,7 +112,7 @@ def _build_ffmpeg_command(self, output_pattern: str) -> List[str]: "-loglevel", "warning", # Show errors and warnings output_pattern ] - + return command def _estimate_total_frames(self, duration: Optional[float]) -> Optional[int]: @@ -234,12 +244,12 @@ def _finalize_extraction(self, process: subprocess.Popen, stderr_queue: queue.Qu return True - def _extract_frames(self, duration: float = None) -> bool: + def _extract_frames(self, duration: float = None, color_info=None) -> bool: """Override to add real-time progress tracking to frame extraction with proper cleanup.""" output_pattern = os.path.join(self.temp_dir, f"frame_%05d.{self.output_format}") - + # Build command and estimate progress - command = self._build_ffmpeg_command(output_pattern) + command = self._build_ffmpeg_command(output_pattern, color_info) estimated_total_frames = self._estimate_total_frames(duration) # Print the FFmpeg command for debugging diff --git a/sharp_frames/sharp_frames_processor.py b/sharp_frames/sharp_frames_processor.py index 675a137..5bbbfa9 100644 --- a/sharp_frames/sharp_frames_processor.py +++ b/sharp_frames/sharp_frames_processor.py @@ -127,9 +127,16 @@ def _load_input_frames(self) -> Tuple[List[str], bool]: if duration: print(f"Video duration: {self._format_duration(duration)}") + # Extract color space from existing video_info (no extra ffprobe call) + from .processing.colorspace import get_color_info_description + color_info = self._extract_color_info_from_video_info(video_info) + if color_info.needs_conversion: + print(f"Detected color space: {get_color_info_description(color_info)}") + print("Will convert to sRGB/BT.709") + print(f"Extracting frames at {self.fps} fps...") # Extract frames relies on self.temp_dir being set - self._extract_frames(duration) + self._extract_frames(duration, color_info) # Get paths from temp directory frame_paths = self._get_frame_paths() @@ -295,17 +302,24 @@ def _load_frames_from_video_directory(self) -> List[str]: # Extract video info and frames for this video print("Extracting video information...") + video_info = None + color_info = None try: video_info = self._get_video_info() duration = self._extract_duration(video_info) if duration: print(f"Video duration: {self._format_duration(duration)}") + # Extract color space from existing video_info (no extra ffprobe call) + color_info = self._extract_color_info_from_video_info(video_info) except Exception as e: print(f"Warning: Could not extract video info for {os.path.basename(video_path)}: {e}") duration = None - + # Fallback to separate color detection if video_info failed + from .processing.colorspace import detect_color_space + color_info = detect_color_space(video_path) + print(f"Extracting frames at {self.fps} fps...") - if self._extract_frames(duration): + if self._extract_frames(duration, color_info): # Get frame paths from this video video_frame_paths = self._get_frame_paths() print(f"Extracted {len(video_frame_paths)} frames from {os.path.basename(video_path)}") @@ -408,48 +422,74 @@ def _extract_duration(self, video_info: Dict[str, Any]) -> float: return None def _get_video_info(self) -> Dict[str, Any]: - """Get video metadata using FFmpeg""" - # Try using ffprobe for more detailed info + """Get video metadata using FFmpeg including color space info.""" + # Try using ffprobe for more detailed info including color metadata probe_command = [ - "ffprobe", + "ffprobe", "-v", "error", "-show_entries", "format=duration", "-select_streams", "v:0", - "-show_entries", "stream=width,height,avg_frame_rate,duration", + "-show_entries", "stream=width,height,avg_frame_rate,duration,color_primaries,color_transfer,color_space", "-of", "json", self.input_path ] - + try: probe_result = subprocess.run( - probe_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + probe_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, check=True, text=True ) - + video_info = json.loads(probe_result.stdout) return video_info except subprocess.CalledProcessError: # Fallback if ffprobe fails return {"error": "Failed to get video info"} + + def _extract_color_info_from_video_info(self, video_info: Dict[str, Any]): + """Extract color space info from existing video_info (avoids extra ffprobe call).""" + from .processing.colorspace import parse_color_info_from_stream, VideoColorInfo, ColorPrimaries, TransferFunction, ColorMatrix + + streams = video_info.get('streams', []) + if streams: + return parse_color_info_from_stream(streams[0]) + + # No stream found, return default + return VideoColorInfo( + color_primaries=ColorPrimaries.UNKNOWN, + transfer_function=TransferFunction.UNKNOWN, + color_matrix=ColorMatrix.UNKNOWN, + is_hdr=False + ) - def _extract_frames(self, duration: float = None) -> bool: - """Extract frames from video using FFmpeg""" + def _extract_frames(self, duration: float = None, color_info=None) -> bool: + """Extract frames from video using FFmpeg with color space conversion.""" + from .processing.colorspace import build_colorspace_filter + output_pattern = os.path.join(self.temp_dir, f"frame_%05d.{self.output_format}") - + # Set a timeout threshold for the process in case it hangs process_timeout_seconds = 3600 # 1 hour timeout for FFmpeg process - - # Build the video filters string + + # Build the video filters string - order matters! vf_filters = [] + + # 1. Color space conversion FIRST (before any other processing) + if color_info is not None: + colorspace_filter = build_colorspace_filter(color_info) + if colorspace_filter: + vf_filters.append(colorspace_filter) + + # 2. FPS filter vf_filters.append(f"fps={self.fps}") - - # Add scaling filter if width is specified + + # 3. Scaling filter (after color conversion) if self.width > 0: vf_filters.append(f"scale={self.width}:-2") # -2 maintains aspect ratio and ensures even height - + # Join all filters with commas vf_string = ",".join(vf_filters) diff --git a/tests/test_colorspace.py b/tests/test_colorspace.py new file mode 100644 index 0000000..f4c33af --- /dev/null +++ b/tests/test_colorspace.py @@ -0,0 +1,356 @@ +"""Tests for the colorspace detection and conversion module.""" + +import pytest +from unittest.mock import patch, MagicMock + +from sharp_frames.processing.colorspace import ( + VideoColorInfo, + ColorPrimaries, + TransferFunction, + ColorMatrix, + detect_color_space, + parse_color_info_from_stream, + build_colorspace_filter, + get_color_info_description, + is_zscale_available, + _default_color_info, +) + + +class TestVideoColorInfo: + """Tests for VideoColorInfo dataclass.""" + + def test_bt709_sdr_no_conversion_needed(self): + """Standard BT.709 SDR should not need conversion.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT709, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + assert info.needs_conversion is False + assert info.is_wide_gamut_sdr is False + + def test_unknown_no_conversion_needed(self): + """Unknown color space should not trigger conversion (safe default).""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.UNKNOWN, + transfer_function=TransferFunction.UNKNOWN, + color_matrix=ColorMatrix.UNKNOWN, + is_hdr=False + ) + assert info.needs_conversion is False + + def test_display_p3_needs_conversion(self): + """Display P3 SDR should need conversion.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.DISPLAY_P3, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + assert info.needs_conversion is True + assert info.is_wide_gamut_sdr is True + + def test_bt2020_sdr_needs_conversion(self): + """BT.2020 SDR should need conversion.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + assert info.needs_conversion is True + assert info.is_wide_gamut_sdr is True + + def test_hdr_pq_needs_conversion(self): + """HDR with PQ transfer (HDR10/Dolby Vision) should need conversion.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.PQ, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + assert info.needs_conversion is True + assert info.is_wide_gamut_sdr is False # It's HDR, not SDR + + def test_hdr_hlg_needs_conversion(self): + """HDR with HLG transfer should need conversion.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.HLG, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + assert info.needs_conversion is True + + +class TestParseColorInfoFromStream: + """Tests for parse_color_info_from_stream function.""" + + def test_parse_bt709_stream(self): + """Parse BT.709 stream data.""" + stream = { + 'color_primaries': 'bt709', + 'color_transfer': 'bt709', + 'color_space': 'bt709' + } + info = parse_color_info_from_stream(stream) + assert info.color_primaries == ColorPrimaries.BT709 + assert info.transfer_function == TransferFunction.BT709 + assert info.color_matrix == ColorMatrix.BT709 + assert info.is_hdr is False + + def test_parse_display_p3_stream(self): + """Parse Display P3 stream data (iPhone SDR).""" + stream = { + 'color_primaries': 'smpte432', + 'color_transfer': 'bt709', + 'color_space': 'bt709' + } + info = parse_color_info_from_stream(stream) + assert info.color_primaries == ColorPrimaries.DISPLAY_P3 + assert info.is_hdr is False + + def test_parse_hdr10_stream(self): + """Parse HDR10 stream data.""" + stream = { + 'color_primaries': 'bt2020', + 'color_transfer': 'smpte2084', + 'color_space': 'bt2020nc' + } + info = parse_color_info_from_stream(stream) + assert info.color_primaries == ColorPrimaries.BT2020 + assert info.transfer_function == TransferFunction.PQ + assert info.color_matrix == ColorMatrix.BT2020_NCL + assert info.is_hdr is True + + def test_parse_hlg_stream(self): + """Parse HLG HDR stream data.""" + stream = { + 'color_primaries': 'bt2020', + 'color_transfer': 'arib-std-b67', + 'color_space': 'bt2020nc' + } + info = parse_color_info_from_stream(stream) + assert info.transfer_function == TransferFunction.HLG + assert info.is_hdr is True + + def test_parse_empty_stream(self): + """Parse stream with missing color data.""" + stream = {} + info = parse_color_info_from_stream(stream) + assert info.color_primaries == ColorPrimaries.UNKNOWN + assert info.transfer_function == TransferFunction.UNKNOWN + assert info.color_matrix == ColorMatrix.UNKNOWN + assert info.is_hdr is False + + def test_parse_stream_with_max_luminance(self): + """Parse HDR stream with mastering display metadata.""" + stream = { + 'color_primaries': 'bt2020', + 'color_transfer': 'smpte2084', + 'color_space': 'bt2020nc', + 'side_data_list': [ + { + 'side_data_type': 'Mastering display metadata', + 'max_luminance': '10000000/10000' + } + ] + } + info = parse_color_info_from_stream(stream) + assert info.max_luminance == 1000.0 + + +class TestBuildColorspaceFilter: + """Tests for build_colorspace_filter function.""" + + def test_no_filter_for_bt709(self): + """No filter needed for standard BT.709.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT709, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + assert build_colorspace_filter(info) is None + + def test_no_filter_for_unknown(self): + """No filter for unknown color space (safe default).""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.UNKNOWN, + transfer_function=TransferFunction.UNKNOWN, + color_matrix=ColorMatrix.UNKNOWN, + is_hdr=False + ) + assert build_colorspace_filter(info) is None + + def test_filter_for_display_p3(self): + """Filter generated for Display P3.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.DISPLAY_P3, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + filter_str = build_colorspace_filter(info) + assert filter_str is not None + assert 'colorspace' in filter_str + assert 'bt709' in filter_str + assert 'smpte432' in filter_str + + @patch('sharp_frames.processing.colorspace.is_zscale_available', return_value=True) + def test_hdr_filter_with_zscale(self, mock_zscale): + """HDR filter uses zscale when available.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.PQ, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + filter_str = build_colorspace_filter(info) + assert filter_str is not None + assert 'zscale' in filter_str + assert 'tonemap' in filter_str + assert 'hable' in filter_str + # Check that input parameters are specified + assert 'tin=' in filter_str + assert 'pin=' in filter_str + + @patch('sharp_frames.processing.colorspace.is_zscale_available', return_value=False) + def test_hdr_filter_fallback_without_zscale(self, mock_zscale): + """HDR filter falls back to colorspace when zscale unavailable.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.PQ, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + filter_str = build_colorspace_filter(info) + assert filter_str is not None + assert 'colorspace' in filter_str + assert 'zscale' not in filter_str + + +class TestGetColorInfoDescription: + """Tests for get_color_info_description function.""" + + def test_sdr_bt709_description(self): + """Description for standard SDR.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT709, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + desc = get_color_info_description(info) + assert 'SDR' in desc + assert 'BT.709' in desc + + def test_display_p3_description(self): + """Description for Display P3.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.DISPLAY_P3, + transfer_function=TransferFunction.BT709, + color_matrix=ColorMatrix.BT709, + is_hdr=False + ) + desc = get_color_info_description(info) + assert 'SDR' in desc + assert 'Display P3' in desc + + def test_hdr_pq_description(self): + """Description for HDR10/Dolby Vision.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.PQ, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + desc = get_color_info_description(info) + assert 'HDR10' in desc or 'Dolby Vision' in desc + assert 'BT.2020' in desc + + def test_hlg_description(self): + """Description for HLG HDR.""" + info = VideoColorInfo( + color_primaries=ColorPrimaries.BT2020, + transfer_function=TransferFunction.HLG, + color_matrix=ColorMatrix.BT2020_NCL, + is_hdr=True + ) + desc = get_color_info_description(info) + assert 'HLG' in desc + + +class TestDetectColorSpace: + """Tests for detect_color_space function with mocked ffprobe.""" + + @patch('subprocess.run') + def test_detect_bt709(self, mock_run): + """Detect BT.709 from ffprobe output.""" + mock_run.return_value = MagicMock( + returncode=0, + stdout='{"streams": [{"codec_type": "video", "color_primaries": "bt709", "color_transfer": "bt709", "color_space": "bt709"}]}' + ) + info = detect_color_space('/fake/video.mp4') + assert info.color_primaries == ColorPrimaries.BT709 + assert info.is_hdr is False + + @patch('subprocess.run') + def test_detect_hdr(self, mock_run): + """Detect HDR from ffprobe output.""" + mock_run.return_value = MagicMock( + returncode=0, + stdout='{"streams": [{"codec_type": "video", "color_primaries": "bt2020", "color_transfer": "smpte2084", "color_space": "bt2020nc"}]}' + ) + info = detect_color_space('/fake/video.mp4') + assert info.color_primaries == ColorPrimaries.BT2020 + assert info.transfer_function == TransferFunction.PQ + assert info.is_hdr is True + + @patch('subprocess.run') + def test_detect_failure_returns_default(self, mock_run): + """Return default color info when ffprobe fails.""" + mock_run.return_value = MagicMock(returncode=1, stdout='') + info = detect_color_space('/fake/video.mp4') + assert info.color_primaries == ColorPrimaries.UNKNOWN + assert info.is_hdr is False + + @patch('subprocess.run') + def test_detect_timeout_returns_default(self, mock_run): + """Return default color info on timeout.""" + from subprocess import TimeoutExpired + mock_run.side_effect = TimeoutExpired('ffprobe', 30) + info = detect_color_space('/fake/video.mp4') + assert info.color_primaries == ColorPrimaries.UNKNOWN + + +class TestIsZscaleAvailable: + """Tests for zscale availability check.""" + + @patch('subprocess.run') + def test_zscale_available(self, mock_run): + """Detect zscale when available.""" + mock_run.return_value = MagicMock( + returncode=0, + stdout='... zscale ... other filters ...' + ) + # Clear the cache + import sharp_frames.processing.colorspace as cs + cs._zscale_available = None + + assert is_zscale_available() is True + + @patch('subprocess.run') + def test_zscale_not_available(self, mock_run): + """Detect when zscale is not available.""" + mock_run.return_value = MagicMock( + returncode=0, + stdout='scale colorspace other_filter' + ) + # Clear the cache + import sharp_frames.processing.colorspace as cs + cs._zscale_available = None + + assert is_zscale_available() is False