diff --git a/TranscribeHttp/__init__.py b/TranscribeHttp/__init__.py index 6fecd2b..3af666b 100644 --- a/TranscribeHttp/__init__.py +++ b/TranscribeHttp/__init__.py @@ -42,7 +42,7 @@ def _cfg() -> SpeechConfig: return SpeechConfig( key=os.environ["SPEECH_KEY"], endpoint=endpoint.rstrip("/"), - api_version=os.environ.get("SPEECH_API_VERSION", "2025-10-15"), + api_version=os.environ.get("SPEECH_API_VERSION", "2024-11-15"), ) def main(req: func.HttpRequest) -> func.HttpResponse: diff --git a/add_url_fields.py b/add_url_fields.py new file mode 100644 index 0000000..a9a7ed8 --- /dev/null +++ b/add_url_fields.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Add URL tracking fields to Azure Search index +""" + +import os +import requests +import sys + +# Load from .env file +env_path = os.path.join(os.path.dirname(__file__), "ui", ".env") +env_vars = {} + +if os.path.exists(env_path): + with open(env_path) as f: + for line in f: + if '=' in line and not line.startswith('#'): + key, value = line.strip().split('=', 1) + env_vars[key] = value + +SEARCH_ENDPOINT = env_vars.get("SEARCH_ENDPOINT") +SEARCH_KEY = env_vars.get("SEARCH_KEY") +SEARCH_INDEX_NAME = env_vars.get("SEARCH_INDEX_NAME", "segments") + +print(f"Endpoint: {SEARCH_ENDPOINT}") +print(f"Index: {SEARCH_INDEX_NAME}") +print(f"Key: {'*' * 10}{SEARCH_KEY[-4:] if SEARCH_KEY else 'NOT FOUND'}") +print() + +if not SEARCH_ENDPOINT or not SEARCH_KEY: + print("ERROR: Missing SEARCH_ENDPOINT or SEARCH_KEY in .env") + sys.exit(1) + +API_VERSION = "2024-07-01" + +def get_index(): + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}?api-version={API_VERSION}" + headers = {"api-key": SEARCH_KEY} + + print(f"Fetching index: {url}") + response = requests.get(url, headers=headers) + + if response.status_code == 200: + return response.json() + else: + print(f"Failed to get index: {response.status_code}") + print(response.text) + return None + +def update_index(index_def): + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}?api-version={API_VERSION}" + headers = { + "Content-Type": "application/json", + "api-key": SEARCH_KEY + } + + response = requests.put(url, headers=headers, json=index_def) + + if response.status_code in [200, 201]: + print("✅ Index updated successfully!") + return True + else: + print(f"❌ Failed to update: {response.status_code}") + print(response.text) + return False + +def main(): + print("Fetching current index...") + index = get_index() + if not index: + sys.exit(1) + + existing_fields = {f["name"] for f in index.get("fields", [])} + print(f"Existing fields: {existing_fields}") + print() + + new_fields = [ + { + "name": "source_url", + "type": "Edm.String", + "searchable": False, + "filterable": True, + "retrievable": True, + "sortable": False, + "facetable": False, + "key": False + }, + { + "name": "source_type", + "type": "Edm.String", + "searchable": False, + "filterable": True, + "retrievable": True, + "sortable": False, + "facetable": True, + "key": False + }, + { + "name": "processed_at", + "type": "Edm.DateTimeOffset", + "searchable": False, + "filterable": True, + "retrievable": True, + "sortable": True, + "facetable": False, + "key": False + } + ] + + added = 0 + for field in new_fields: + if field["name"] in existing_fields: + print(f"⚠️ Already exists: {field['name']}") + else: + print(f"➕ Adding: {field['name']}") + index["fields"].append(field) + added += 1 + + if added == 0: + print("\n✅ All fields already present!") + return + + print(f"\n💾 Saving with {added} new fields...") + if update_index(index): + print("\n🎉 SUCCESS! URL tracking fields added.") + print("\nNext steps:") + print("1. Restart your Streamlit app") + print("2. Go to 'System Diagnostics' page") + print("3. Click 'Check Index Schema' to verify") + +if __name__ == "__main__": + main() diff --git a/scripts/box_shared_folder_manifest.py b/scripts/box_shared_folder_manifest.py index 310df6e..d44eab0 100644 --- a/scripts/box_shared_folder_manifest.py +++ b/scripts/box_shared_folder_manifest.py @@ -1,39 +1,17 @@ """ scripts/box_shared_folder_manifest.py - Generate Video Manifest from Box - -This script enumerates .m4a video files from a Box shared folder and generates -a manifest file (videos.jsonl) that lists all videos with their IDs and media URLs. -It creates open shared links for each file so Azure Speech Service can access them. - -Architecture Role: -- Pre-processing step before video ingestion -- Generates videos.jsonl input file for import_videos.py -- Handles Box API authentication and folder traversal -- Creates publicly accessible download URLs for Speech Service - -Usage: - python scripts/box_shared_folder_manifest.py - -Output: - - videos.jsonl: One JSON object per line with video_id and media_url - -Configuration (via .env): - - BOX_SHARED_FOLDER_URL: Box shared folder link - - BOX_TOKEN or BOX_ACCESS_TOKEN/BOX_REFRESH_TOKEN: Box authentication - - OUT_PATH: Output file path (default: videos.jsonl) - - RECURSIVE: Whether to traverse subfolders (default: 1) """ import json import os import requests +import time from typing import Dict, Any, List, Optional - from box_auth import get_access_token BOX_API = "https://api.box.com/2.0" -SHARED_FOLDER_URL = os.environ["BOX_SHARED_FOLDER_URL"] # https://...box.com/s/ +SHARED_FOLDER_URL = os.environ["BOX_SHARED_FOLDER_URL"] print("BOX_SHARED_FOLDER_URL =", SHARED_FOLDER_URL) OUT_PATH = os.environ.get("OUT_PATH", "videos.jsonl") RECURSIVE = os.environ.get("RECURSIVE", "1") == "1" @@ -63,7 +41,6 @@ def resolve_shared_folder(token: str) -> Dict[str, Any]: return r.json() - def list_folder_items(token: str, folder_id: str, limit: int = 1000, offset: int = 0) -> Dict[str, Any]: url = f"{BOX_API}/folders/{folder_id}/items" params = {"limit": limit, "offset": offset} @@ -72,31 +49,48 @@ def list_folder_items(token: str, folder_id: str, limit: int = 1000, offset: int return r.json() -def ensure_open_shared_link_for_file(token: str, file_id: str) -> str: +def ensure_open_shared_link_for_file(token: str, file_id: str, max_retries: int = 3) -> Optional[str]: """ - Ensure file has an open shared link and return a direct-download URL. + Ensure file has an open shared link with retry logic for timeouts. """ url = f"{BOX_API}/files/{file_id}" payload = {"shared_link": {"access": "open"}} - params = {"fields": "shared_link"} # IMPORTANT: ask for shared_link back - - r = requests.put(url, headers=auth_headers(token), params=params, json=payload, timeout=30) - r.raise_for_status() - data = r.json() - - sl = data.get("shared_link") or {} - # print("shared_link =", json.dumps(sl, indent=2)) - # Prefer direct static download URL - dl = sl.get("download_url") - if dl: - return dl - - # Fallback: at least return the shared link (may require cookies) - if sl.get("url"): - return sl["url"] - - raise RuntimeError(f"No shared_link returned for file {file_id}: {data}") - + params = {"fields": "shared_link"} + + for attempt in range(max_retries): + try: + # Increased timeout to 60 seconds + r = requests.put(url, headers=auth_headers(token), params=params, json=payload, timeout=60) + + if r.status_code == 404: + print(f"⚠️ Skipping file {file_id} (not found)") + return None + + r.raise_for_status() + data = r.json() + sl = data.get("shared_link") or {} + + dl = sl.get("download_url") + if dl: + return dl + if sl.get("url"): + return sl["url"] + + raise RuntimeError(f"No shared_link returned for file {file_id}: {data}") + + except requests.exceptions.Timeout: + if attempt < max_retries - 1: + wait_time = 2 ** attempt # Exponential backoff: 1, 2, 4 seconds + print(f"⏱️ Timeout on file {file_id}, retrying in {wait_time}s ({attempt + 1}/{max_retries})...") + time.sleep(wait_time) + else: + print(f"⚠️ Skipping file {file_id} (timeout after {max_retries} attempts)") + return None + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + print(f"⚠️ Skipping file {file_id} (not found)") + return None + raise def walk(token: str, folder_id: str) -> List[Dict[str, Any]]: @@ -114,6 +108,22 @@ def walk(token: str, folder_id: str) -> List[Dict[str, Any]]: return items +def load_existing_entries() -> set: + """Load already processed file IDs to avoid duplicates.""" + processed = set() + if os.path.exists(OUT_PATH): + with open(OUT_PATH, "r", encoding="utf-8") as f: + for line in f: + try: + entry = json.loads(line.strip()) + video_id = entry.get("video_id", "") + if video_id.startswith("vid_"): + processed.add(video_id[4:]) + except json.JSONDecodeError: + continue + return processed + + def main(): token = get_access_token() @@ -122,10 +132,16 @@ def main(): raise RuntimeError(f"Shared link did not resolve to a folder: {shared_folder.get('type')}") root_id = shared_folder["id"] + # Load already processed files to resume + processed_ids = load_existing_entries() + print(f"Resuming: {len(processed_ids)} files already processed") + queue = [root_id] - out_count = 0 + out_count = len(processed_ids) + skipped = 0 + new_files = 0 - with open(OUT_PATH, "w", encoding="utf-8") as f: + with open(OUT_PATH, "a", encoding="utf-8") as f: while queue: fid = queue.pop(0) entries = walk(token, fid) @@ -141,26 +157,36 @@ def main(): if et != "file": continue - if not lname.endswith(".m4a"): continue file_id = e["id"] + + # Skip if already processed + if file_id in processed_ids: + continue + video_id = f"vid_{file_id}" - # Make a per-file open shared link (Speech can fetch without auth). - # If your org disallows open links, this will fail — then you’ll need Blob staging. file_link = ensure_open_shared_link_for_file(token, file_id) + + if file_link is None: + skipped += 1 + continue - # Encourage direct download behavior - media_url = file_link # + ("?download=1" if "?" not in file_link else "&download=1") - + media_url = file_link f.write(json.dumps({"video_id": video_id, "media_url": media_url}) + "\n") + f.flush() # Ensure write is saved immediately out_count += 1 + new_files += 1 + if out_count % 10 == 0: - print(f"Wrote {out_count} entries...") + print(f"Wrote {out_count} entries total...") + + # Small delay to avoid rate limiting + time.sleep(0.2) - print(f"Done. Wrote {out_count} m4a entries to {OUT_PATH}") + print(f"Done. Total: {out_count} entries (new: {new_files}, skipped: {skipped})") if __name__ == "__main__": diff --git a/scripts/box_shared_folder_manifest.py.backup.20260205_170952 b/scripts/box_shared_folder_manifest.py.backup.20260205_170952 new file mode 100644 index 0000000..310df6e --- /dev/null +++ b/scripts/box_shared_folder_manifest.py.backup.20260205_170952 @@ -0,0 +1,167 @@ +""" +scripts/box_shared_folder_manifest.py - Generate Video Manifest from Box + +This script enumerates .m4a video files from a Box shared folder and generates +a manifest file (videos.jsonl) that lists all videos with their IDs and media URLs. +It creates open shared links for each file so Azure Speech Service can access them. + +Architecture Role: +- Pre-processing step before video ingestion +- Generates videos.jsonl input file for import_videos.py +- Handles Box API authentication and folder traversal +- Creates publicly accessible download URLs for Speech Service + +Usage: + python scripts/box_shared_folder_manifest.py + +Output: + - videos.jsonl: One JSON object per line with video_id and media_url + +Configuration (via .env): + - BOX_SHARED_FOLDER_URL: Box shared folder link + - BOX_TOKEN or BOX_ACCESS_TOKEN/BOX_REFRESH_TOKEN: Box authentication + - OUT_PATH: Output file path (default: videos.jsonl) + - RECURSIVE: Whether to traverse subfolders (default: 1) +""" + +import json +import os +import requests +from typing import Dict, Any, List, Optional + +from box_auth import get_access_token + +BOX_API = "https://api.box.com/2.0" + +SHARED_FOLDER_URL = os.environ["BOX_SHARED_FOLDER_URL"] # https://...box.com/s/ +print("BOX_SHARED_FOLDER_URL =", SHARED_FOLDER_URL) +OUT_PATH = os.environ.get("OUT_PATH", "videos.jsonl") +RECURSIVE = os.environ.get("RECURSIVE", "1") == "1" + + +def shared_headers(token: str) -> Dict[str, str]: + return { + "Authorization": f"Bearer {token}", + "BoxApi": f"shared_link={SHARED_FOLDER_URL}", + "Content-Type": "application/json", + } + + +def auth_headers(token: str) -> Dict[str, str]: + return { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + + +def resolve_shared_folder(token: str) -> Dict[str, Any]: + url = f"{BOX_API}/shared_items" + h = shared_headers(token) + r = requests.get(url, headers=h, timeout=30) + if r.status_code != 200: + raise RuntimeError(f"{r.status_code} {r.text} (headers sent: {h.get('BoxApi')})") + return r.json() + + + +def list_folder_items(token: str, folder_id: str, limit: int = 1000, offset: int = 0) -> Dict[str, Any]: + url = f"{BOX_API}/folders/{folder_id}/items" + params = {"limit": limit, "offset": offset} + r = requests.get(url, headers=shared_headers(token), params=params, timeout=30) + r.raise_for_status() + return r.json() + + +def ensure_open_shared_link_for_file(token: str, file_id: str) -> str: + """ + Ensure file has an open shared link and return a direct-download URL. + """ + url = f"{BOX_API}/files/{file_id}" + payload = {"shared_link": {"access": "open"}} + params = {"fields": "shared_link"} # IMPORTANT: ask for shared_link back + + r = requests.put(url, headers=auth_headers(token), params=params, json=payload, timeout=30) + r.raise_for_status() + data = r.json() + + sl = data.get("shared_link") or {} + # print("shared_link =", json.dumps(sl, indent=2)) + # Prefer direct static download URL + dl = sl.get("download_url") + if dl: + return dl + + # Fallback: at least return the shared link (may require cookies) + if sl.get("url"): + return sl["url"] + + raise RuntimeError(f"No shared_link returned for file {file_id}: {data}") + + + +def walk(token: str, folder_id: str) -> List[Dict[str, Any]]: + items: List[Dict[str, Any]] = [] + offset = 0 + limit = 1000 + while True: + page = list_folder_items(token, folder_id, limit=limit, offset=offset) + entries = page.get("entries", []) + items.extend(entries) + total = page.get("total_count", 0) + offset += len(entries) + if offset >= total or not entries: + break + return items + + +def main(): + token = get_access_token() + + shared_folder = resolve_shared_folder(token) + if shared_folder.get("type") != "folder": + raise RuntimeError(f"Shared link did not resolve to a folder: {shared_folder.get('type')}") + root_id = shared_folder["id"] + + queue = [root_id] + out_count = 0 + + with open(OUT_PATH, "w", encoding="utf-8") as f: + while queue: + fid = queue.pop(0) + entries = walk(token, fid) + + for e in entries: + et = e.get("type") + name = (e.get("name") or "") + lname = name.lower() + + if et == "folder" and RECURSIVE: + queue.append(e["id"]) + continue + + if et != "file": + continue + + if not lname.endswith(".m4a"): + continue + + file_id = e["id"] + video_id = f"vid_{file_id}" + + # Make a per-file open shared link (Speech can fetch without auth). + # If your org disallows open links, this will fail — then you’ll need Blob staging. + file_link = ensure_open_shared_link_for_file(token, file_id) + + # Encourage direct download behavior + media_url = file_link # + ("?download=1" if "?" not in file_link else "&download=1") + + f.write(json.dumps({"video_id": video_id, "media_url": media_url}) + "\n") + out_count += 1 + if out_count % 10 == 0: + print(f"Wrote {out_count} entries...") + + print(f"Done. Wrote {out_count} m4a entries to {OUT_PATH}") + + +if __name__ == "__main__": + main() diff --git a/shared/__pycache__/speech_batch.cpython-311.pyc b/shared/__pycache__/speech_batch.cpython-311.pyc deleted file mode 100644 index 525fbbc..0000000 Binary files a/shared/__pycache__/speech_batch.cpython-311.pyc and /dev/null differ diff --git a/ui/Dockerfile b/ui/Dockerfile index e3a4bd9..8fd9d6c 100644 --- a/ui/Dockerfile +++ b/ui/Dockerfile @@ -4,6 +4,9 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt +RUN apt-get update && apt-get install -y ffmpeg +RUN pip install azure-cognitiveservices-speech + COPY . . EXPOSE 8501 diff --git a/ui/host.json b/ui/host.json new file mode 100644 index 0000000..3369578 --- /dev/null +++ b/ui/host.json @@ -0,0 +1,8 @@ +{ + "version": "2.0", + "isDefaultHostConfig": true, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} \ No newline at end of file diff --git a/ui/pages/2_Upload.py b/ui/pages/2_Upload.py new file mode 100644 index 0000000..1d6c15b --- /dev/null +++ b/ui/pages/2_Upload.py @@ -0,0 +1,479 @@ +""" +upload_transcribe.py - Upload & Transcribe page for the Video Annotation Platform +""" + +import sys +sys.path.append("..") # allows import from parent directory +import streamlit as st +import time +import pandas as pd +import io +import tempfile +import re +from typing import Tuple, Optional +from utils import ( + AZURE_STORAGE_KEY, + SPEECH_KEY, + POLL_SECONDS, + check_url_fields_status, + generate_video_id, + check_yt_dlp, + detect_url_type, + upload_to_azure_blob_sdk, + upload_to_azure_blob_fixed, + submit_transcription_direct, + poll_transcription_operation, + get_transcription_from_result, + process_transcription_to_segments, + save_segments_to_blob, + index_segments_direct, + ms_to_ts, + process_single_video, + download_youtube_audio +) + +st.header("Upload Video for Transcription") + +# Check URL fields status +url_status = check_url_fields_status() + +if url_status['fields_exist']: + st.success("✅ URL Tracking Enabled - Original source URLs will be stored") +else: + st.warning(f""" + ⚠️ **Partial URL Tracking** - Missing fields: {', '.join(url_status['missing_fields'])} + + Videos will still be processed, but URL information will be limited. + Add missing fields to your Azure Search index for full functionality. + """) + +# Check Azure configuration +azure_configured = bool(AZURE_STORAGE_KEY) and bool(SPEECH_KEY) +if not azure_configured: + st.error("⚠️ Azure Storage and Speech keys required. Check .env file.") + +# Source selection +source_type = st.radio("Select Source", + ["File Upload", "Direct URL", "YouTube", "📁 Batch CSV Upload"], + horizontal=True) + +media_url = None +video_id = None +file_bytes = None +yt_url = None +csv_df = None +detected_source_type = "unknown" + +# --- File Upload --- +if source_type == "File Upload": + if not azure_configured: + st.info("Please configure Azure Storage to enable file upload") + else: + uploaded_file = st.file_uploader( + "Choose video/audio file", + type=["mp4", "avi", "mov", "mkv", "m4a", "mp3", "wav"], + accept_multiple_files=False + ) + + if uploaded_file: + st.success(f"📁 {uploaded_file.name} ({uploaded_file.size / 1024 / 1024:.1f} MB)") + file_bytes = uploaded_file.getvalue() + video_id = generate_video_id(uploaded_file.name) + detected_source_type = "upload" + st.info("File ready for upload") + +# --- Direct URL --- +elif source_type == "Direct URL": + url_input = st.text_input("Media URL", placeholder="https://tulane.box.com/shared/static/ ...") + + if url_input.strip(): + media_url = url_input.strip() + video_id = generate_video_id(url_input) + detected_source_type = "direct" + st.success("✅ URL validated") + +# --- YouTube --- +elif source_type == "YouTube": + yt_url = st.text_input( + "YouTube URL", + placeholder="https://youtube.com/watch?v= ...", + value=st.session_state.yt_url_value, + key="yt_url_input" + ) + + # Update session state + if yt_url != st.session_state.yt_url_value: + st.session_state.yt_url_value = yt_url + try: + st.rerun() + except: + pass + + # Check yt-dlp + if not check_yt_dlp(): + st.warning("yt-dlp not installed") + if st.button("Install yt-dlp"): + with st.spinner("Installing..."): + import subprocess + subprocess.run(["pip", "install", "-q", "yt-dlp"]) + try: + st.rerun() + except: + st.info("Please refresh the page") + elif yt_url and yt_url.strip(): + video_id = generate_video_id(f"yt_{yt_url.strip()}") + detected_source_type = "youtube" + st.success("YouTube URL ready") + +# --- Batch CSV Upload --- +elif source_type == "📁 Batch CSV Upload": + st.subheader("📁 Batch Process Videos from CSV") + + csv_file = st.file_uploader( + "Upload CSV file", + type=["csv"], + help="CSV must contain a column with video URLs" + ) + + if csv_file: + try: + # Read CSV with flexible parsing + try: + csv_df = pd.read_csv(csv_file) + except Exception: + csv_file.seek(0) + csv_df = pd.read_csv(csv_file, header=None) + csv_df.columns = [f"column_{i}" for i in range(len(csv_df.columns))] + + # Handle case where column names are URLs + url_like_columns = [] + for col in csv_df.columns: + col_str = str(col).strip() + if detect_url_type(col_str) != "unknown": + url_like_columns.append(col) + + if url_like_columns and len(csv_df.columns) == 1: + url_col_name = csv_df.columns[0] + new_row = {url_col_name: url_col_name} + csv_df = pd.concat([pd.DataFrame([new_row]), csv_df], ignore_index=True) + + st.success(f"✅ Loaded CSV with {len(csv_df)} rows") + + # Column selection + url_column = st.selectbox("Select column containing video URLs", options=csv_df.columns.tolist()) + + id_column_options = ["Auto-generate"] + [c for c in csv_df.columns if c != url_column] + id_column = st.selectbox("Select column for custom Video ID (optional)", options=id_column_options, index=0) + + # Extract and validate URLs + urls_raw = csv_df[url_column].dropna().astype(str).tolist() + urls_to_process = [u.strip() for u in urls_raw if u.strip()] + + # Preview + with st.expander(f"Preview URLs ({len(urls_to_process)} found)"): + for i, url in enumerate(urls_to_process[:10], 1): + url_type = detect_url_type(url) + icon = "🎬" if url_type == "youtube" else "📄" if url_type == "direct" else "❓" + st.text(f"{i}. {icon} {url[:80]}...") + + # Validate + valid_urls = [] + invalid_urls = [] + for url in urls_to_process: + url_type = detect_url_type(str(url)) + if url_type in ["youtube", "direct"]: + valid_urls.append(url) + else: + invalid_urls.append(url) + + col1, col2, col3 = st.columns(3) + col1.metric("Total", len(urls_to_process)) + col2.metric("✅ Valid", len(valid_urls)) + col3.metric("❌ Invalid", len(invalid_urls)) + + # Store in session state + st.session_state['batch_urls'] = valid_urls + st.session_state['batch_df'] = csv_df + st.session_state['batch_url_column'] = url_column + st.session_state['batch_id_column'] = id_column + + except Exception as e: + st.error(f"Error reading CSV: {e}") + import traceback + st.error(traceback.format_exc()) + +# Custom ID input +custom_id = st.text_input("Custom Video ID (optional)") +if custom_id.strip() and source_type != "📁 Batch CSV Upload": + video_id = custom_id.strip() + +# Determine if we can process +can_process = False +if source_type == "File Upload": + can_process = file_bytes is not None and azure_configured +elif source_type == "Direct URL": + can_process = media_url is not None and len(str(media_url).strip()) > 0 +elif source_type == "YouTube": + yt_url_to_check = st.session_state.get('yt_url_value', '') + can_process = len(str(yt_url_to_check).strip()) > 0 and check_yt_dlp() +elif source_type == "📁 Batch CSV Upload": + can_process = (st.session_state.get('batch_urls') and + len(st.session_state.get('batch_urls', [])) > 0 and + azure_configured and + not st.session_state.get('batch_processing', False)) + +# Process button +button_text = "🚀 Start Transcription" +if source_type == "📁 Batch CSV Upload": + count = len(st.session_state.get('batch_urls', [])) + button_text = f"🚀 Process {count} Videos from CSV" + +if st.button(button_text, type="primary", disabled=not can_process): + + # --- BATCH PROCESSING --- + if source_type == "📁 Batch CSV Upload": + st.session_state.batch_processing = True + st.session_state.batch_results = [] + + urls = st.session_state.get('batch_urls', []) + csv_df = st.session_state.get('batch_df') + url_column = st.session_state.get('batch_url_column') + id_column = st.session_state.get('batch_id_column') + + total = len(urls) + st.info(f"Starting batch processing of {total} videos...") + + # Progress UI + overall_progress = st.progress(0) + status_text = st.empty() + results_container = st.container() + + results = [] + for idx, url in enumerate(urls, 1): + # Get custom ID if specified + custom_vid_id = None + if id_column != "Auto-generate": + row = csv_df[csv_df[url_column] == url] + if not row.empty: + custom_vid_id = str(row[id_column].iloc[0]) + custom_vid_id = re.sub(r'[^\w\s-]', '', custom_vid_id).strip().replace(' ', '_')[:50] + + # Detect source type + url_type = detect_url_type(url) + src_type = "youtube" if url_type == "youtube" else "direct" + + # Process + result = process_single_video( + url=url, + custom_id=custom_vid_id, + source_type=src_type, + progress_bar=overall_progress, + status_text=status_text, + overall_progress=(idx, total) + ) + + results.append(result) + st.session_state.batch_results = results + + # Update progress + progress_pct = int((idx / total) * 100) + overall_progress.progress(progress_pct) + + # Show result + with results_container: + if result['status'] == 'success': + url_stored = "✅ URL saved" if result.get('url_stored') else "⚠️ URL not stored" + st.success(f"✅ [{idx}/{total}] {result['video_id']}: {result['segments_count']} segments ({url_stored})") + else: + error_msg = result.get('error', 'Unknown error')[:200] + st.error(f"❌ [{idx}/{total}] Failed: {error_msg}...") + + time.sleep(1) # Rate limiting + + # Final summary + overall_progress.progress(100) + status_text.text("Batch processing complete!") + + successful = [r for r in results if r['status'] == 'success'] + failed = [r for r in results if r['status'] == 'failed'] + + st.markdown("---") + st.subheader("📊 Batch Processing Summary") + + col1, col2, col3 = st.columns(3) + col1.metric("Total", total) + col2.metric("Successful", len(successful), f"{len(successful)/total*100:.1f}%" if total > 0 else "0%") + col3.metric("Failed", len(failed), f"{len(failed)/total*100:.1f}%" if total > 0 else "0%") + + # Detailed results + with st.expander("View Detailed Results"): + results_df = pd.DataFrame([ + { + 'Video ID': r['video_id'], + 'URL': r['url'][:50] + "..." if len(r['url']) > 50 else r['url'], + 'Source Type': r.get('source_type', 'unknown'), + 'Status': r['status'], + 'Segments': r.get('segments_count', 0), + 'URL Stored': r.get('url_stored', False), + 'Indexing': r.get('index_status', 'N/A'), + 'Error': (r.get('error', '')[:100] + '...') if r.get('error') else '' + } + for r in results + ]) + st.dataframe(results_df) + + # Download results + csv_buffer = io.StringIO() + results_df.to_csv(csv_buffer, index=False) + st.download_button("Download Results CSV", csv_buffer.getvalue(), "batch_results.csv", "text/csv") + + # Search hint + if successful: + st.info("💡 **Search processed videos:**") + video_ids = [r['video_id'] for r in successful[:5]] + st.code(f"video_id:({' OR '.join(video_ids)})") + + st.session_state.batch_processing = False + + # --- SINGLE VIDEO PROCESSING --- + else: + progress_bar = st.progress(0) + status = st.empty() + + try: + # Upload file if needed + if source_type == "File Upload" and file_bytes: + progress_bar.progress(10) + status.text("Uploading to Azure Blob...") + + blob_name = f"upload_{video_id}_{int(time.time())}.m4a" + + sas_url, error = upload_to_azure_blob_sdk(file_bytes, blob_name) + if error and ("not installed" in error or "SDK" in error): + sas_url, error = upload_to_azure_blob_fixed(file_bytes, blob_name) + + if error: + raise Exception(error) + + media_url = sas_url + progress_bar.progress(50) + + # Download YouTube if needed + elif source_type == "YouTube": + yt_url = st.session_state.get('yt_url_value', '') + + if not yt_url or not yt_url.strip(): + raise Exception("YouTube URL is empty") + + with tempfile.TemporaryDirectory() as tmpdir: + progress_bar.progress(10) + status.text("Downloading from YouTube...") + + output_path = f"{tmpdir}/youtube_{video_id}.m4a" + downloaded_path, error = download_youtube_audio(yt_url.strip(), output_path) + + if error: + raise Exception(error) + + progress_bar.progress(50) + status.text("Uploading to Azure Blob...") + + with open(downloaded_path, 'rb') as f: + file_bytes = f.read() + + blob_name = f"youtube_{video_id}_{int(time.time())}.m4a" + + sas_url, error = upload_to_azure_blob_sdk(file_bytes, blob_name) + if error and ("not installed" in error): + sas_url, error = upload_to_azure_blob_fixed(file_bytes, blob_name) + + if error: + raise Exception(error) + + media_url = sas_url + progress_bar.progress(75) + + if not media_url: + raise Exception("No media URL available") + + # Transcribe + status.text("Submitting to Azure Speech-to-Text...") + result = submit_transcription_direct(video_id, media_url) + operation_url = result.get("operation_url") + + if not operation_url: + raise Exception("No operation URL returned") + + # Poll + max_polls = 120 + transcription_data = None + + for i in range(max_polls): + time.sleep(POLL_SECONDS) + poll_result = poll_transcription_operation(operation_url) + status_text = poll_result.get("status", "unknown") + + progress = min(75 + int((i / max_polls) * 20), 95) + progress_bar.progress(progress) + status.text(f"Transcribing... ({i * POLL_SECONDS // 60} min) - Status: {status_text}") + + if status_text.lower() == "succeeded": + transcription_data = get_transcription_from_result(poll_result) + break + elif status_text.lower() == "failed": + raise Exception(f"Transcription failed: {poll_result.get('properties', {}).get('error', {}).get('message', 'Unknown error')}") + + if not transcription_data: + raise Exception("Transcription timed out") + + # Process and index + progress_bar.progress(98) + status.text("Processing segments and indexing...") + + segments = process_transcription_to_segments(transcription_data, video_id) + + # Save to blob + save_segments_to_blob(video_id, segments) + + # Index with URL tracking + original_url = None + if source_type == "YouTube": + original_url = st.session_state.get('yt_url_value', '') + elif source_type == "Direct URL": + original_url = media_url + elif source_type == "File Upload": + original_url = f"uploaded_file://{video_id}" + + index_result = index_segments_direct( + video_id, + segments, + source_url=original_url, + source_type=detected_source_type + ) + + url_stored_msg = "✅ Source URL stored" if index_result.get('source_url_stored') else "⚠️ URL storage not available" + + progress_bar.progress(100) + status.text("Complete!") + + st.success(f""" + ✅ **Transcription Complete!** + - Video ID: {video_id} + - Segments: {len(segments)} + - Source Type: {detected_source_type} + - Indexed: {index_result.get('indexed', 0)} documents + - {url_stored_msg} + """) + + if original_url: + st.info(f"**Original Source:** [{original_url}]({original_url})") + + st.code(f'Search: video_id:{video_id}') + + with st.expander("View first 5 segments"): + for seg in segments[:5]: + st.write(f"**{ms_to_ts(seg['start_ms'])} - {ms_to_ts(seg['end_ms'])}:** {seg['text'][:100]}...") + + except Exception as e: + st.error(f"❌ Error: {str(e)}") + st.exception(e) \ No newline at end of file diff --git a/ui/pages/3_Manage_Videos.py b/ui/pages/3_Manage_Videos.py new file mode 100644 index 0000000..3a08930 --- /dev/null +++ b/ui/pages/3_Manage_Videos.py @@ -0,0 +1,218 @@ +""" +manage_videos.py - Manage Videos page for the Video Annotation Platform +""" + +import sys +sys.path.append("..") +import streamlit as st +import pandas as pd +import io +import time +from utils import ( + SEARCH_ENDPOINT, + SEARCH_KEY, + check_url_fields_status, + get_stored_videos, + delete_video_by_id +) + +st.header("📚 Manage Stored Videos") +st.info("View, search, and manage all processed videos and their source URLs") + +if not SEARCH_ENDPOINT or not SEARCH_KEY: + st.error("Azure Search not configured. Cannot retrieve video list.") +else: + # Check URL fields status + url_status = check_url_fields_status() + + if url_status['fields_exist']: + st.success("✅ URL tracking fields are configured") + else: + st.warning(f"⚠️ Missing URL fields: {', '.join(url_status['missing_fields'])}") + + # URL coverage analysis + if st.button("📊 Analyze URL Data Coverage"): + with st.spinner("Analyzing..."): + all_videos = get_stored_videos(include_missing=True) + + with_urls = [v for v in all_videos if v.get('source_url') and v.get('source_type') not in ['', 'unknown']] + without_urls = [v for v in all_videos if not v.get('source_url') or v.get('source_type') in ['', 'unknown']] + + col1, col2, col3 = st.columns(3) + col1.metric("Total Videos", len(all_videos)) + col2.metric("✅ With URL Data", len(with_urls), f"{len(with_urls)/len(all_videos)*100:.1f}%" if all_videos else "0%") + col3.metric("⚠️ Missing URL Data", len(without_urls), f"{len(without_urls)/len(all_videos)*100:.1f}%" if all_videos else "0%") + + # By type breakdown + st.subheader("Breakdown by Source Type") + type_counts = {} + for v in all_videos: + t = v.get('source_type') or 'unknown' + type_counts[t] = type_counts.get(t, 0) + 1 + + cols = st.columns(len(type_counts) if type_counts else 1) + for i, (stype, count) in enumerate(sorted(type_counts.items())): + icon = "🎬" if stype == "youtube" else "📄" if stype == "direct" else "📁" if stype == "upload" else "❓" + cols[i % len(cols)].metric(f"{icon} {stype}", count) + + if without_urls: + with st.expander(f"Videos without URL data ({len(without_urls)})"): + st.info("These were likely processed before URL tracking was enabled") + for v in without_urls[:20]: + st.text(f"• {v.get('video_id')}") + + st.markdown("---") + + # Filters + st.subheader("Filter Videos") + col1, col2 = st.columns(2) + + with col1: + filter_video_id = st.text_input("Filter by Video ID (optional)") + with col2: + filter_options = ["All", "With URL Data Only", "Missing URL Data Only", "youtube", "direct", "upload", "unknown"] + filter_source_type = st.selectbox("Filter by Source Type", options=filter_options, index=0) + + # Load videos button + load_clicked = st.button("🔍 Load Videos", type="primary") + + # Handle deletion using session state + if st.session_state.video_to_delete: + vid_to_delete = st.session_state.video_to_delete + + with st.spinner(f"Deleting {vid_to_delete}..."): + success = delete_video_by_id(vid_to_delete) + + if success: + # Remove from cache immediately + if st.session_state.stored_videos_cache: + st.session_state.stored_videos_cache = [ + v for v in st.session_state.stored_videos_cache + if v.get('video_id') != vid_to_delete + ] + st.success(f"✅ Deleted {vid_to_delete}") + st.session_state.delete_success = True + else: + st.error(f"❌ Failed to delete {vid_to_delete}") + + # Clear the trigger + st.session_state.video_to_delete = None + time.sleep(0.5) + st.rerun() + + # Load videos if button clicked + if load_clicked: + with st.spinner("Retrieving videos..."): + + # Handle special filters + if filter_source_type == "Missing URL Data Only": + all_videos = get_stored_videos(include_missing=True) + videos = [v for v in all_videos if not v.get('source_url') or v.get('source_type') in ['', 'unknown']] + if filter_video_id.strip(): + videos = [v for v in videos if filter_video_id.strip().lower() in v.get('video_id', '').lower()] + elif filter_source_type == "With URL Data Only": + all_videos = get_stored_videos(include_missing=True) + videos = [v for v in all_videos if v.get('source_url') and v.get('source_type') not in ['', 'unknown']] + if filter_video_id.strip(): + videos = [v for v in videos if filter_video_id.strip().lower() in v.get('video_id', '').lower()] + else: + source_type = None if filter_source_type == "All" else filter_source_type + videos = get_stored_videos( + video_id=filter_video_id if filter_video_id.strip() else None, + source_type=source_type, + include_missing=True, + limit=1000 + ) + + st.session_state.stored_videos_cache = videos + st.session_state.videos_loaded = True + st.success(f"Found {len(videos)} videos") + + # Display videos + if st.session_state.stored_videos_cache: + videos = st.session_state.stored_videos_cache + + # Metrics + st.markdown("---") + cols = st.columns(4) + + type_counts = {} + for v in videos: + t = v.get('source_type') or 'unknown' + type_counts[t] = type_counts.get(t, 0) + 1 + + cols[0].metric("Total", len(videos)) + cols[1].metric("YouTube", type_counts.get('youtube', 0)) + cols[2].metric("Direct", type_counts.get('direct', 0)) + cols[3].metric("Upload", type_counts.get('upload', 0)) + + # Group by type + st.markdown("---") + st.subheader("Video List") + + videos_by_type = {} + for v in videos: + stype = v.get('source_type') or 'unknown' + if stype not in videos_by_type: + videos_by_type[stype] = [] + videos_by_type[stype].append(v) + + # Display by category + for source_type in ['youtube', 'direct', 'upload', 'unknown']: + if source_type not in videos_by_type: + continue + + type_videos = videos_by_type[source_type] + icon = "🎬" if source_type == "youtube" else "📄" if source_type == "direct" else "📁" if source_type == "upload" else "❓" + + with st.expander(f"{icon} {source_type.upper()} ({len(type_videos)} videos)", expanded=(source_type == 'youtube')): + for i, video in enumerate(type_videos, 1): + vid = video.get('video_id', 'unknown') + src_url = video.get('source_url', '') + processed = video.get('processed_at', 'unknown') + + has_url = bool(src_url) + status_icon = "✅" if has_url else "⚠️" + + with st.container(): + cols = st.columns([4, 1]) + + with cols[0]: + st.write(f"**{status_icon} {i}. {vid}**") + st.caption(f"Processed: {processed}") + + if src_url: + display_url = src_url[:80] + "..." if len(str(src_url)) > 80 else src_url + st.code(display_url) + if str(src_url).startswith('http'): + st.markdown(f"[Open Source ↗]({src_url})") + else: + st.warning("No source URL stored") + + with cols[1]: + # Capture current vid value for callback + st.button( + f"🗑️ Delete", + key=f"del_{vid}_{i}_{source_type}", + on_click=lambda v=vid: setattr(st.session_state, 'video_to_delete', v) + ) + + st.markdown("---") + + # Export + st.markdown("---") + if st.button("📥 Export to CSV"): + export_df = pd.DataFrame([ + { + 'video_id': v.get('video_id'), + 'source_type': v.get('source_type') or 'unknown', + 'source_url': v.get('source_url', ''), + 'has_url_data': bool(v.get('source_url')), + 'processed_at': v.get('processed_at', 'unknown') + } + for v in videos + ]) + + csv_buffer = io.StringIO() + export_df.to_csv(csv_buffer, index=False) + st.download_button("Download CSV", csv_buffer.getvalue(), "video_list.csv", "text/csv") \ No newline at end of file diff --git a/ui/pages/4_System_Diagnostics.py b/ui/pages/4_System_Diagnostics.py new file mode 100644 index 0000000..4d056c0 --- /dev/null +++ b/ui/pages/4_System_Diagnostics.py @@ -0,0 +1,75 @@ +""" +system_diagnostics.py - System Diagnostics page for the Video Annotation Platform +""" + +import sys +sys.path.append("..") +import streamlit as st +from utils import ( + SPEECH_KEY, AZURE_OPENAI_KEY, SEARCH_KEY, AZURE_STORAGE_KEY, SEARCH_FN_URL, + check_yt_dlp, debug_check_index_schema +) + +st.header("⚙️ System Diagnostics") +st.info("Check system configuration and troubleshoot issues") + +# Configuration status +st.subheader("Configuration Status") + +config_checks = { + "Azure Speech (SPEECH_KEY)": bool(SPEECH_KEY), + "Azure OpenAI (AZURE_OPENAI_KEY)": bool(AZURE_OPENAI_KEY), + "Azure Search (SEARCH_KEY)": bool(SEARCH_KEY), + "Azure Storage (AZURE_STORAGE_KEY)": bool(AZURE_STORAGE_KEY), + "Search Function (SEARCH_FN_URL)": bool(SEARCH_FN_URL), + "yt-dlp installed": check_yt_dlp() +} + +cols = st.columns(2) +for i, (name, status) in enumerate(config_checks.items()): + icon = "✅" if status else "❌" + cols[i % 2].write(f"{icon} {name}: {'OK' if status else 'Not configured'}") + +# Index schema check +st.markdown("---") +st.subheader("Index Schema Check") + +if st.button("🔍 Check Index Schema"): + with st.spinner("Fetching schema..."): + schema = debug_check_index_schema() + + if isinstance(schema, dict): + st.success(f"Index: {schema['index_name']}") + st.write(f"Key Field: `{schema['key_field']}`") + + if schema.get('has_all_url_fields'): + st.success("✅ All URL tracking fields present") + else: + st.warning(f"⚠️ Missing fields: {', '.join(schema.get('missing_url_fields', []))}") + + with st.expander("View all fields"): + for field in schema['fields']: + key = "🔑" if field['key'] else "" + url = "🔗" if 'url' in field['name'].lower() else "" + facet = "📊" if field.get('facetable') else "" + st.caption(f"{key}{url}{facet} `{field['name']}` ({field['type']}) - facetable: {field.get('facetable', False)}") + + st.session_state.index_schema_cache = schema + else: + st.error(f"Schema check failed: {schema}") + +# Debug info +st.markdown("---") +st.subheader("Debug Information") + +with st.expander("Session State"): + st.json({ + k: str(v)[:100] + "..." if len(str(v)) > 100 else v + for k, v in st.session_state.items() + }) + +with st.expander("Recent Processing Debug"): + if st.session_state.get('debug_info'): + st.json(st.session_state['debug_info']) + else: + st.info("No debug info yet. Process a video first.") \ No newline at end of file diff --git a/ui/ui_search.py b/ui/ui_search.py index f3b7125..546a83e 100644 --- a/ui/ui_search.py +++ b/ui/ui_search.py @@ -1,53 +1,46 @@ """ -ui_search.py - Streamlit Web Interface for Video Segment Search - -This Streamlit application provides a user-friendly web interface for searching -indexed video segments. Users can: -- Enter text queries to search across all indexed segments -- Choose search mode (keyword, vector, or hybrid) -- Filter results by video_id and adjust result count -- View segment text with timestamps and relevance scores - -Architecture Role: -- Frontend user interface for the video annotation system -- Deployed as Azure Container App (video-annotator-ui) -- Calls SearchSegments Azure Function for all search operations -- Displays results with formatted timestamps and metadata - -Deployment: - - Local: python -m streamlit run ui_search.py - - Azure: Deployed as Container App (see ui/README.md) - -Configuration (via .env or Container App env vars): - - SEARCH_FN_URL: SearchSegments function endpoint - - DEFAULT_MODE: Default search mode (hybrid/keyword/vector) - - DEFAULT_TOP: Default number of results - - DEFAULT_K: Default vector recall depth +ui_search.py - Main Streamlit entry point +Uses multipage navigation (pages folder) and shared utilities from utils.py """ import os import requests import streamlit as st +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any, Tuple, List from dotenv import load_dotenv -# Load .env locally (Container Apps/App Service will use real env vars) -load_dotenv() +# Import shared utilities (must be in same directory) +from utils import ms_to_ts, SEARCH_FN_URL -SEARCH_FN_URL = os.environ["SEARCH_FN_URL"] -DEFAULT_MODE = os.environ.get("DEFAULT_MODE", "hybrid") -DEFAULT_TOP = int(os.environ.get("DEFAULT_TOP", "10")) -DEFAULT_K = int(os.environ.get("DEFAULT_K", "40")) +load_dotenv() st.set_page_config(page_title="Video Segment Search", layout="wide") - -def ms_to_ts(ms: int) -> str: - s = max(0, int(ms // 1000)) - m, s = divmod(s, 60) - h, m = divmod(m, 60) - return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" - - +# ============================================================================= +# SESSION STATE INITIALIZATION +# ============================================================================= +# All session variables used across pages must be initialized here +defaults = { + 'yt_url_value': "", + 'batch_results': [], + 'batch_processing': False, + 'index_schema_cache': None, + 'stored_videos_cache': None, + 'url_fields_status': None, + 'debug_info': {}, + 'video_to_delete': None, + 'delete_success': False, + 'videos_loaded': False, + 'debug_poll_url': None +} +for key, value in defaults.items(): + if key not in st.session_state: + st.session_state[key] = value + +# ----------------------------------------------------------------------------- +# Search‑specific functions +# ----------------------------------------------------------------------------- def call_search_api(payload: dict) -> dict: r = requests.post( SEARCH_FN_URL, @@ -68,12 +61,10 @@ def render_search_page() -> None: mode = st.selectbox( "Mode", ["keyword", "hybrid", "vector"], - index=["keyword", "hybrid", "vector"].index(DEFAULT_MODE) - if DEFAULT_MODE in ("keyword", "hybrid", "vector") - else 1, + index=1 ) - top = st.slider("Top", 1, 50, DEFAULT_TOP) - k = st.slider("Vector k (hybrid/vector)", 5, 200, DEFAULT_K) + top = st.slider("Top", 1, 50, 10) + k = st.slider("Vector k (hybrid/vector)", 5, 200, 40) video_id_filter = st.text_input("Filter by video_id (optional)", value="") st.caption("Tip: keep k ~ 4×top for hybrid.") @@ -112,11 +103,9 @@ def render_search_page() -> None: with st.expander(header, expanded=(i <= 3)): st.write(h.get("text", "")) - labels = h.get("pred_labels") or [] conf = h.get("pred_confidence") rationale = h.get("pred_rationale") - if labels or conf is not None or rationale: if labels: st.write("**Labels:**", ", ".join(labels)) @@ -126,7 +115,13 @@ def render_search_page() -> None: st.write("**Rationale:**", rationale) +# ----------------------------------------------------------------------------- +# Multipage navigation +# ----------------------------------------------------------------------------- pg_search = st.Page(render_search_page, title="Search", icon="🔎", default=True) pg_labels = st.Page("pages/1_Label_Management.py", title="Label Management", icon="🏷️") +pg_upload = st.Page("pages/2_Upload.py", title="Upload", icon="⬆️") +pg_manage = st.Page("pages/3_Manage_Videos.py", title="Manage Videos", icon="📚") +pg_diag = st.Page("pages/4_System_Diagnostics.py", title="System Diagnostics", icon="⚙️") -st.navigation([pg_search, pg_labels]).run() +st.navigation([pg_search, pg_labels, pg_upload, pg_manage, pg_diag]).run() \ No newline at end of file diff --git a/ui/utils.py b/ui/utils.py new file mode 100644 index 0000000..24fc125 --- /dev/null +++ b/ui/utils.py @@ -0,0 +1,797 @@ +""" +utils.py - Shared utilities for Video Annotation Platform +""" + +import os +import requests +import streamlit as st +import json +import time +import re +import subprocess +import hashlib +import base64 +import hmac +import urllib.parse +from datetime import datetime, timezone, timedelta +from typing import Optional, Dict, Any, Tuple, List +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() + +__all__ = [ + "SEARCH_FN_URL", "SPEECH_KEY", "SPEECH_REGION", "SPEECH_API_VERSION", + "AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_KEY", "AZURE_OPENAI_DEPLOYMENT", + "SEARCH_ENDPOINT", "SEARCH_KEY", "SEARCH_INDEX_NAME", + "AZURE_STORAGE_ACCOUNT", "AZURE_STORAGE_KEY", "INPUT_CONTAINER", "SEGMENTS_CONTAINER", + "POLL_SECONDS", + "ms_to_ts", "sanitize_id", "detect_url_type", "check_yt_dlp", + "debug_check_index_schema", "get_index_schema", "check_url_fields_status", + "submit_transcription_direct", "poll_transcription_operation", "get_transcription_from_result", + "get_embeddings", "index_segments_direct", "process_transcription_to_segments", + "get_stored_videos", "delete_video_by_id", + "generate_video_id", "test_sas_url", "generate_sas_token_fixed", + "upload_to_azure_blob_fixed", "upload_to_azure_blob_sdk", "save_segments_to_blob", + "download_youtube_audio", "process_single_video" +] + +# ============================================================================= +# CONFIGURATION +# ============================================================================= +SEARCH_FN_URL = os.environ.get("SEARCH_FN_URL", "") + +SPEECH_KEY = os.environ.get("SPEECH_KEY") +SPEECH_REGION = os.environ.get("SPEECH_REGION", "eastus") +SPEECH_API_VERSION = os.environ.get("SPEECH_API_VERSION", "2024-11-15") + +AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT") +AZURE_OPENAI_KEY = os.environ.get("AZURE_OPENAI_KEY") +AZURE_OPENAI_DEPLOYMENT = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "text-embedding-3-small") + +SEARCH_ENDPOINT = os.environ.get("SEARCH_ENDPOINT") +SEARCH_KEY = os.environ.get("SEARCH_KEY") +SEARCH_INDEX_NAME = os.environ.get("SEARCH_INDEX_NAME", "segments") + +AZURE_STORAGE_ACCOUNT = os.environ.get("AZURE_STORAGE_ACCOUNT", "storagevideoannotator") +AZURE_STORAGE_KEY = os.environ.get("AZURE_STORAGE_KEY", "") +INPUT_CONTAINER = os.environ.get("INPUT_CONTAINER", "speech-input") +SEGMENTS_CONTAINER = os.environ.get("SEGMENTS_CONTAINER", "segments") + +POLL_SECONDS = int(os.environ.get("POLL_SECONDS", "15")) + +# ============================================================================= +# UTILITY FUNCTIONS +# ============================================================================= + +def ms_to_ts(ms: int) -> str: + s = max(0, int(ms // 1000)) + m, s = divmod(s, 60) + h, m = divmod(m, 60) + return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}" + +def sanitize_id(id_string: str) -> str: + if not id_string: + id_string = "unknown" + sanitized = re.sub(r'[^\w\-]', '_', str(id_string)) + sanitized = re.sub(r'_+', '_', sanitized) + sanitized = sanitized.strip('_') + if not sanitized: + sanitized = "unknown" + if sanitized.startswith('_') or sanitized.startswith('-'): + sanitized = 'id' + sanitized + if len(sanitized) > 1024: + hash_suffix = hashlib.md5(sanitized.encode()).hexdigest()[:16] + sanitized = sanitized[:1000] + "_" + hash_suffix + return sanitized + +def detect_url_type(url: str) -> str: + if not url: + return "unknown" + url_lower = str(url).lower().strip() + youtube_patterns = [ + r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com|youtu\.be)', + r'youtube\.com\/watch\?v=', + r'youtu\.be\/', + r'youtube\.com\/shorts\/' + ] + for pattern in youtube_patterns: + if re.search(pattern, url_lower): + return "youtube" + media_extensions = ['.mp4', '.m4a', '.mp3', '.wav', '.mov', '.avi', '.mkv', '.webm'] + if any(url_lower.endswith(ext) for ext in media_extensions): + return "direct" + cloud_patterns = ['box.com', 'drive.google.com', 'dropbox.com', 'onedrive'] + if any(pattern in url_lower for pattern in cloud_patterns): + return "direct" + return "unknown" + +def check_yt_dlp() -> bool: + try: + result = subprocess.run(["which", "yt-dlp"], capture_output=True, text=True) + return result.returncode == 0 + except: + return False + +# ============================================================================= +# AZURE SEARCH SCHEMA FUNCTIONS +# ============================================================================= + +def debug_check_index_schema(): + if not SEARCH_ENDPOINT or not SEARCH_KEY or not SEARCH_INDEX_NAME: + return "Search not configured" + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}?api-version=2024-07-01" + headers = {"api-key": SEARCH_KEY} + try: + r = requests.get(url, headers=headers, timeout=30) + if r.status_code == 200: + schema = r.json() + key_field = None + fields_info = [] + url_fields = ['source_url', 'source_type', 'processed_at'] + found_url_fields = [] + for field in schema.get("fields", []): + field_info = { + "name": field.get("name"), + "type": field.get("type"), + "key": field.get("key", False), + "retrievable": field.get("retrievable", False), + "filterable": field.get("filterable", False), + "sortable": field.get("sortable", False), + "facetable": field.get("facetable", False) + } + fields_info.append(field_info) + if field.get("key", False): + key_field = field.get("name") + if field.get("name") in url_fields: + found_url_fields.append(field.get("name")) + return { + "index_name": schema.get("name"), + "key_field": key_field, + "fields": fields_info, + "found_url_fields": found_url_fields, + "missing_url_fields": list(set(url_fields) - set(found_url_fields)), + "has_all_url_fields": len(found_url_fields) == len(url_fields) + } + else: + return f"Index check failed: HTTP {r.status_code}" + except Exception as e: + return f"Error checking index: {str(e)}" + +def get_index_schema(): + if st.session_state.get('index_schema_cache'): + return st.session_state.index_schema_cache + schema_info = debug_check_index_schema() + if isinstance(schema_info, dict): + st.session_state.index_schema_cache = schema_info + return schema_info + else: + raise RuntimeError(f"Cannot fetch index schema: {schema_info}") + +def check_url_fields_status(): + if st.session_state.get('url_fields_status'): + return st.session_state.url_fields_status + try: + schema = get_index_schema() + if isinstance(schema, dict): + result = { + 'fields_exist': schema.get('has_all_url_fields', False), + 'found_fields': schema.get('found_url_fields', []), + 'missing_fields': schema.get('missing_url_fields', []), + 'key_field': schema.get('key_field') + } + st.session_state.url_fields_status = result + return result + except: + pass + return { + 'fields_exist': False, + 'found_fields': [], + 'missing_fields': ['source_url', 'source_type', 'processed_at'], + 'key_field': None + } + +# ============================================================================= +# AZURE SPEECH API FUNCTIONS +# ============================================================================= + +def submit_transcription_direct(video_id: str, media_url: str) -> Dict[str, Any]: + if not SPEECH_KEY: + raise RuntimeError("SPEECH_KEY not configured") + endpoint = f"https://{SPEECH_REGION}.api.cognitive.microsoft.com/speechtotext/transcriptions:submit?api-version={SPEECH_API_VERSION}" + headers = { + "Ocp-Apim-Subscription-Key": SPEECH_KEY, + "Content-Type": "application/json" + } + payload = { + "contentUrls": [media_url], + "locale": "en-US", + "displayName": f"transcription_{video_id}", + "properties": { + "diarizationEnabled": False, + "wordLevelTimestampsEnabled": False, + "punctuationMode": "DictatedAndAutomatic", + "profanityFilterMode": "Masked", + "timeToLiveHours": 24 + } + } + try: + r = requests.post(endpoint, headers=headers, json=payload, timeout=60) + r.raise_for_status() + operation_url = r.headers.get("Location") + if not operation_url: + result = r.json() + operation_url = result.get("self") or result.get("links", {}).get("self") + if not operation_url: + raise RuntimeError("No operation URL returned") + return {"operation_url": operation_url, "video_id": video_id} + except requests.exceptions.HTTPError as e: + error_msg = f"Speech API error {r.status_code}: {r.text}" + if r.status_code == 401: + error_msg = "Azure Speech API authentication failed. Check SPEECH_KEY." + raise RuntimeError(error_msg) + +def poll_transcription_operation(operation_url: str) -> Dict[str, Any]: + if not SPEECH_KEY: + raise RuntimeError("SPEECH_KEY not configured") + headers = {"Ocp-Apim-Subscription-Key": SPEECH_KEY} + try: + poll_url = operation_url.replace("/transcriptions:submit/", "/transcriptions/") + st.session_state['debug_poll_url'] = poll_url + r = requests.get(poll_url, headers=headers, timeout=30) + r.raise_for_status() + return r.json() + except requests.exceptions.RequestException as e: + raise RuntimeError(f"Failed to poll transcription: {str(e)}") + +def get_transcription_from_result(result_data: Dict) -> Dict[str, Any]: + if not SPEECH_KEY: + raise RuntimeError("SPEECH_KEY not configured") + headers = {"Ocp-Apim-Subscription-Key": SPEECH_KEY} + try: + links = result_data.get("links", {}) + files_url = links.get("files") + if not files_url: + if "combinedRecognizedPhrases" in result_data: + return result_data + raise RuntimeError("No files URL in result") + r = requests.get(files_url, headers=headers, timeout=30) + r.raise_for_status() + files_data = r.json() + for file in files_data.get("values", []): + if file.get("kind") == "Transcription": + content_url = file.get("links", {}).get("contentUrl") + if content_url: + content_r = requests.get(content_url, timeout=60) + content_r.raise_for_status() + return content_r.json() + raise RuntimeError("No transcription file found") + except requests.exceptions.RequestException as e: + raise RuntimeError(f"Failed to get transcription result: {str(e)}") + +# ============================================================================= +# EMBEDDING AND INDEXING +# ============================================================================= + +def get_embeddings(texts: list) -> list: + if not AZURE_OPENAI_ENDPOINT or not AZURE_OPENAI_KEY: + raise RuntimeError("Azure OpenAI not configured") + url = f"{AZURE_OPENAI_ENDPOINT}/openai/deployments/{AZURE_OPENAI_DEPLOYMENT}/embeddings?api-version=2024-02-01" + headers = {"api-key": AZURE_OPENAI_KEY, "Content-Type": "application/json"} + payload = {"input": texts, "model": "text-embedding-3-small"} + try: + r = requests.post(url, headers=headers, json=payload, timeout=60) + r.raise_for_status() + result = r.json() + return [item["embedding"] for item in result["data"]] + except Exception as e: + raise RuntimeError(f"Embedding failed: {str(e)}") + +def index_segments_direct(video_id: str, segments: list, source_url: str = None, source_type: str = None) -> Dict[str, Any]: + if not SEARCH_ENDPOINT or not SEARCH_KEY: + raise RuntimeError("Azure Search not configured") + schema_info = get_index_schema() + key_field = schema_info.get("key_field") + available_fields = {f.get("name") for f in schema_info.get("fields", [])} + if not key_field: + raise RuntimeError("No key field found") + url_fields_available = { + 'source_url': 'source_url' in available_fields, + 'source_type': 'source_type' in available_fields, + 'processed_at': 'processed_at' in available_fields + } + texts = [seg.get("text", "") for seg in segments] + try: + embeddings = get_embeddings(texts) + except Exception as e: + st.warning(f"Embedding failed, indexing without vectors: {e}") + embeddings = [None] * len(segments) + documents = [] + processed_timestamp = datetime.utcnow().isoformat() + "Z" + for i, (seg, embedding) in enumerate(zip(segments, embeddings)): + safe_video_id = sanitize_id(video_id) + doc_id = f"{safe_video_id}_{i}" + doc = {"@search.action": "upload", key_field: doc_id} + field_mappings = { + "video_id": safe_video_id, + "segment_id": str(seg.get("segment_id", i)), + "text": str(seg.get("text", "")), + "start_ms": int(seg.get("start_ms", 0)), + "end_ms": int(seg.get("end_ms", 0)), + "pred_labels": seg.get("pred_labels", []) if seg.get("pred_labels") else [] + } + if url_fields_available['source_url']: + field_mappings["source_url"] = str(source_url) if source_url else "" + if url_fields_available['source_type']: + field_mappings["source_type"] = str(source_type) if source_type else "unknown" + if url_fields_available['processed_at']: + field_mappings["processed_at"] = processed_timestamp + for field_name, value in field_mappings.items(): + if field_name in available_fields: + doc[field_name] = value + embedding_field = next((f for f in ["embedding", "embeddings", "vector", "vectors"] if f in available_fields), None) + if embedding and embedding_field: + try: + doc[embedding_field] = [float(x) for x in embedding] + except (ValueError, TypeError): + pass + documents.append(doc) + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}/docs/index?api-version=2024-07-01" + headers = {"api-key": SEARCH_KEY, "Content-Type": "application/json"} + payload = {"value": documents} + try: + r = requests.post(url, headers=headers, json=payload, timeout=60) + if r.status_code >= 400: + raise RuntimeError(f"Indexing failed: HTTP {r.status_code}") + return { + "indexed": len(documents), + "video_id": video_id, + "key_field_used": key_field, + "source_url_stored": bool(source_url and url_fields_available['source_url']), + "source_type_stored": bool(source_type and url_fields_available['source_type']), + "url_fields_available": url_fields_available + } + except Exception as e: + raise RuntimeError(f"Indexing failed: {str(e)}") + +def process_transcription_to_segments(transcription_data: Dict, video_id: str) -> list: + segments = [] + for i, phrase in enumerate(transcription_data.get("recognizedPhrases", [])): + offset = phrase.get("offsetInTicks", 0) // 10000 + duration = phrase.get("durationInTicks", 0) // 10000 + nbest = phrase.get("nBest", []) + text = nbest[0].get("display", "") if nbest else "" + segments.append({ + "segment_id": i, + "video_id": video_id, + "text": text, + "start_ms": offset, + "end_ms": offset + duration, + "pred_labels": [] + }) + return segments + +# ============================================================================= +# VIDEO RETRIEVAL AND DELETION +# ============================================================================= + +def get_stored_videos(video_id: str = None, source_type: str = None, + include_missing: bool = True, limit: int = 1000) -> List[Dict]: + if not SEARCH_ENDPOINT or not SEARCH_KEY: + return [] + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}/docs/search?api-version=2024-07-01" + headers = {"api-key": SEARCH_KEY, "Content-Type": "application/json"} + try: + schema = get_index_schema() + available_fields = {f['name'] for f in schema.get('fields', [])} + except: + available_fields = set() + filters = [] + if video_id: + escaped_id = video_id.replace("'", "''") + filters.append(f"video_id eq '{escaped_id}'") + if source_type and source_type != "All": + escaped_type = source_type.replace("'", "''") + filters.append(f"source_type eq '{escaped_type}'") + filter_query = " and ".join(filters) if filters else None + select_fields = ["video_id"] + optional_fields = ["source_url", "source_type", "processed_at"] + for field in optional_fields: + if not available_fields or field in available_fields: + select_fields.append(field) + all_videos = {} + skip = 0 + batch_size = 1000 + max_iterations = 100 + try: + for iteration in range(max_iterations): + payload = { + "search": "*", + "select": ",".join(select_fields), + "top": batch_size, + "skip": skip, + "count": True + } + if filter_query: + payload["filter"] = filter_query + if "processed_at" in available_fields: + payload["orderby"] = "processed_at desc" + r = requests.post(url, headers=headers, json=payload, timeout=30) + r.raise_for_status() + data = r.json() + docs = data.get("value", []) + total_count = data.get("@odata.count", 0) + if not docs: + break + for doc in docs: + vid = doc.get('video_id') + if vid and vid not in all_videos: + all_videos[vid] = { + 'video_id': vid, + 'source_type': doc.get('source_type') or 'unknown', + 'source_url': doc.get('source_url', ''), + 'processed_at': doc.get('processed_at', 'unknown') + } + skip += len(docs) + if skip >= total_count or len(docs) < batch_size: + break + videos = list(all_videos.values())[:limit] + return videos + except Exception as e: + st.error(f"Failed to retrieve videos: {e}") + return [] + +def delete_video_by_id(video_id: str) -> bool: + if not SEARCH_ENDPOINT or not SEARCH_KEY: + return False + search_url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}/docs/search?api-version=2024-07-01" + headers = {"api-key": SEARCH_KEY, "Content-Type": "application/json"} + escaped_id = video_id.replace("'", "''") + payload = { + "search": "*", + "filter": f"video_id eq '{escaped_id}'", + "select": "video_id", + "top": 1000 + } + try: + r = requests.post(search_url, headers=headers, json=payload, timeout=30) + r.raise_for_status() + docs = r.json().get("value", []) + if not docs: + return False + schema = get_index_schema() + key_field = schema.get('key_field', 'id') + delete_docs = [] + for doc in docs: + doc_key = doc.get(key_field) or doc.get('id') + if doc_key: + delete_docs.append({ + "@search.action": "delete", + key_field: doc_key + }) + if not delete_docs: + return False + delete_url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}/docs/index?api-version=2024-07-01" + r = requests.post(delete_url, headers=headers, json={"value": delete_docs}, timeout=60) + r.raise_for_status() + return True + except Exception as e: + st.error(f"Delete failed: {e}") + return False + +# ============================================================================= +# AZURE STORAGE FUNCTIONS +# ============================================================================= + +def generate_video_id(filename: str) -> str: + clean_name = Path(filename).stem + clean_name = re.sub(r'[^\w\s-]', '', clean_name) + clean_name = re.sub(r'[-\s]+', '_', clean_name) + hash_suffix = hashlib.md5(clean_name.encode()).hexdigest()[:8] + return f"vid_{clean_name[:50]}_{hash_suffix}" + +def test_sas_url(sas_url: str) -> Tuple[bool, str]: + try: + r = requests.head(sas_url, timeout=10, allow_redirects=True) + return (r.status_code == 200, f"HTTP {r.status_code}") + except Exception as e: + return (False, str(e)) + +def generate_sas_token_fixed(blob_name: str, expiry_hours: int = 24) -> Optional[str]: + if not AZURE_STORAGE_KEY: + return None + try: + expiry = datetime.now(timezone.utc) + timedelta(hours=expiry_hours) + expiry_str = expiry.strftime('%Y-%m-%dT%H:%M:%SZ') + account_key = base64.b64decode(AZURE_STORAGE_KEY) + canonicalized_resource = f"/blob/{AZURE_STORAGE_ACCOUNT}/{INPUT_CONTAINER}/{blob_name}" + string_to_sign = ( + f"r\n\n{expiry_str}\n{canonicalized_resource}\n\n\nhttps\n2020-12-06\nb\n\n\n\n\n\n\n" + ) + signed_hmac = hmac.new(account_key, string_to_sign.encode('utf-8'), hashlib.sha256).digest() + signature = base64.b64encode(signed_hmac).decode('utf-8') + sas_params = { + 'sv': '2020-12-06', + 'sr': 'b', + 'sp': 'r', + 'se': expiry_str, + 'spr': 'https', + 'sig': signature + } + return '&'.join([f"{k}={urllib.parse.quote(v, safe='')}" for k, v in sas_params.items()]) + except Exception as e: + st.error(f"SAS generation error: {e}") + return None + +def upload_to_azure_blob_fixed(file_bytes: bytes, blob_name: str) -> Tuple[Optional[str], Optional[str]]: + if not AZURE_STORAGE_KEY: + return None, "Azure Storage key not configured" + try: + url = f"https://{AZURE_STORAGE_ACCOUNT}.blob.core.windows.net/{INPUT_CONTAINER}/{blob_name}" + date_str = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + content_length = len(file_bytes) + string_to_sign = ( + f"PUT\n\n\n{content_length}\n\napplication/octet-stream\n\n\n\n\n\n\n" + f"x-ms-blob-type:BlockBlob\nx-ms-date:{date_str}\nx-ms-version:2020-12-06\n" + f"/{AZURE_STORAGE_ACCOUNT}/{INPUT_CONTAINER}/{blob_name}" + ) + account_key = base64.b64decode(AZURE_STORAGE_KEY) + signed_hmac = hmac.new(account_key, string_to_sign.encode('utf-8'), hashlib.sha256).digest() + signature = base64.b64encode(signed_hmac).decode('utf-8') + headers = { + "x-ms-date": date_str, + "x-ms-version": "2020-12-06", + "x-ms-blob-type": "BlockBlob", + "Content-Type": "application/octet-stream", + "Content-Length": str(content_length), + "Authorization": f"SharedKey {AZURE_STORAGE_ACCOUNT}:{signature}" + } + r = requests.put(url, data=file_bytes, headers=headers, timeout=300) + if r.status_code not in [201, 200]: + return None, f"Upload failed: HTTP {r.status_code}" + sas_token = generate_sas_token_fixed(blob_name) + if not sas_token: + return None, "Failed to generate SAS token" + sas_url = f"{url}?{sas_token}" + is_valid, test_msg = test_sas_url(sas_url) + if not is_valid: + return None, f"SAS URL validation failed: {test_msg}" + return sas_url, None + except Exception as e: + return None, f"Upload error: {str(e)}" + +def upload_to_azure_blob_sdk(file_bytes: bytes, blob_name: str) -> Tuple[Optional[str], Optional[str]]: + try: + from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions + connection_string = ( + f"DefaultEndpointsProtocol=https;" + f"AccountName={AZURE_STORAGE_ACCOUNT};" + f"AccountKey={AZURE_STORAGE_KEY};" + f"EndpointSuffix=core.windows.net" + ) + blob_service = BlobServiceClient.from_connection_string(connection_string) + container_client = blob_service.get_container_client(INPUT_CONTAINER) + try: + container_client.create_container() + except Exception: + pass + blob_client = container_client.get_blob_client(blob_name) + blob_client.upload_blob(file_bytes, overwrite=True) + sas_token = generate_blob_sas( + account_name=AZURE_STORAGE_ACCOUNT, + container_name=INPUT_CONTAINER, + blob_name=blob_name, + account_key=AZURE_STORAGE_KEY, + permission=BlobSasPermissions(read=True), + expiry=datetime.now(timezone.utc) + timedelta(hours=24), + protocol="https" + ) + sas_url = f"https://{AZURE_STORAGE_ACCOUNT}.blob.core.windows.net/{INPUT_CONTAINER}/{blob_name}?{sas_token}" + is_valid, test_msg = test_sas_url(sas_url) + if not is_valid: + return None, f"SAS URL validation failed: {test_msg}" + return sas_url, None + except ImportError: + return None, "azure-storage-blob not installed" + except Exception as e: + return None, f"SDK upload failed: {str(e)}" + +def save_segments_to_blob(video_id: str, segments: list) -> str: + if not AZURE_STORAGE_KEY: + raise RuntimeError("Azure Storage key not configured") + blob_name = f"{video_id}_segments.json" + url = f"https://{AZURE_STORAGE_ACCOUNT}.blob.core.windows.net/{SEGMENTS_CONTAINER}/{blob_name}" + json_bytes = json.dumps(segments, indent=2).encode('utf-8') + content_length = len(json_bytes) + date_str = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + string_to_sign = ( + f"PUT\n\n\n{content_length}\n\napplication/json\n\n\n\n\n\n\n" + f"x-ms-blob-type:BlockBlob\nx-ms-date:{date_str}\nx-ms-version:2020-12-06\n" + f"/{AZURE_STORAGE_ACCOUNT}/{SEGMENTS_CONTAINER}/{blob_name}" + ) + account_key = base64.b64decode(AZURE_STORAGE_KEY) + signed_hmac = hmac.new(account_key, string_to_sign.encode('utf-8'), hashlib.sha256).digest() + signature = base64.b64encode(signed_hmac).decode('utf-8') + headers = { + "x-ms-date": date_str, + "x-ms-version": "2020-12-06", + "x-ms-blob-type": "BlockBlob", + "Content-Type": "application/json", + "Content-Length": str(content_length), + "Authorization": f"SharedKey {AZURE_STORAGE_ACCOUNT}:{signature}" + } + r = requests.put(url, data=json_bytes, headers=headers, timeout=60) + r.raise_for_status() + return blob_name + +def download_youtube_audio(youtube_url: str, output_path: str, + progress_callback=None) -> Tuple[Optional[str], Optional[str]]: + if not check_yt_dlp(): + return None, "yt-dlp not installed" + if not youtube_url or not youtube_url.strip(): + return None, "YouTube URL is empty" + try: + cmd = [ + "yt-dlp", + "-f", "bestaudio[ext=m4a]/bestaudio", + "--extract-audio", + "--audio-format", "m4a", + "--audio-quality", "0", + "--no-check-certificate", + "--no-warnings", + "-o", output_path, + youtube_url.strip() + ] + try: + node_check = subprocess.run(["which", "node"], capture_output=True, text=True) + if node_check.returncode != 0: + cmd.extend(["--extractor-args", "youtube:player_client=web"]) + except: + pass + if progress_callback: + progress_callback(15, "Downloading from YouTube...") + result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) + if result.returncode != 0: + error_msg = result.stderr[:500] + if "JavaScript runtime" in error_msg: + error_msg += "\n\n💡 Tip: Install Node.js or run: pip install yt-dlp --upgrade" + return None, f"yt-dlp failed: {error_msg}" + if os.path.exists(output_path): + return output_path, None + base = output_path.rsplit('.', 1)[0] + for ext in ['.m4a', '.mp3', '.webm', '.opus']: + alt_path = base + ext + if os.path.exists(alt_path): + return alt_path, None + return None, "Download completed but file not found" + except subprocess.TimeoutExpired: + return None, "Download timed out after 10 minutes" + except Exception as e: + return None, f"Error: {str(e)}" + +# ============================================================================= +# MAIN VIDEO PROCESSING +# ============================================================================= + +def process_single_video(url: str, custom_id: Optional[str] = None, + source_type: str = "unknown", + progress_bar=None, status_text=None, + overall_progress: Tuple[int, int] = (0, 1)) -> Dict[str, Any]: + result = { + "url": url, + "video_id": None, + "status": "pending", + "segments_count": 0, + "error": None, + "index_status": None, + "source_url": url, + "source_type": source_type, + "url_stored": False + } + try: + url_type = detect_url_type(url) + if url_type == "unknown": + result["status"] = "failed" + result["error"] = "Unknown URL type" + return result + video_id = custom_id.strip() if custom_id else generate_video_id(f"batch_{url}") + result["video_id"] = video_id + current, total = overall_progress + base_progress = int((current / total) * 100) if progress_bar else 0 + if status_text: + status_text.text(f"[{current}/{total}] Processing: {video_id}") + media_url = None + if url_type == "youtube": + if not check_yt_dlp(): + result["status"] = "failed" + result["error"] = "yt-dlp not installed" + return result + import tempfile + with tempfile.TemporaryDirectory() as tmpdir: + if status_text: + status_text.text(f"[{current}/{total}] Downloading from YouTube...") + output_path = f"{tmpdir}/youtube_{video_id}.m4a" + downloaded_path, error = download_youtube_audio(url.strip(), output_path) + if error: + result["status"] = "failed" + result["error"] = f"Download failed: {error}" + return result + with open(downloaded_path, 'rb') as f: + file_bytes = f.read() + blob_name = f"batch_youtube_{video_id}_{int(time.time())}.m4a" + if status_text: + status_text.text(f"[{current}/{total}] Uploading to Azure...") + sas_url, error = upload_to_azure_blob_sdk(file_bytes, blob_name) + if error and ("not installed" in error or "SDK" in error): + sas_url, error = upload_to_azure_blob_fixed(file_bytes, blob_name) + if error: + result["status"] = "failed" + result["error"] = f"Upload failed: {error}" + return result + media_url = sas_url + elif url_type == "direct": + media_url = url.strip() + if status_text: + status_text.text(f"[{current}/{total}] Using direct URL...") + if not media_url: + result["status"] = "failed" + result["error"] = "No media URL available" + return result + if status_text: + status_text.text(f"[{current}/{total}] Submitting to Speech API...") + submit_result = submit_transcription_direct(video_id, media_url) + operation_url = submit_result.get("operation_url") + if not operation_url: + result["status"] = "failed" + result["error"] = "No operation URL returned" + return result + max_polls = 120 + transcription_data = None + for i in range(max_polls): + time.sleep(POLL_SECONDS) + poll_result = poll_transcription_operation(operation_url) + status = poll_result.get("status", "unknown") + if progress_bar: + poll_progress = min(int((i / max_polls) * 20), 20) + overall = base_progress + int((1 / total) * 80) + int((1 / total) * poll_progress) + progress_bar.progress(min(overall, 99)) + if status.lower() == "succeeded": + transcription_data = get_transcription_from_result(poll_result) + break + elif status.lower() == "failed": + error_msg = poll_result.get("properties", {}).get("error", {}).get("message", "Unknown error") + result["status"] = "failed" + result["error"] = f"Transcription failed: {error_msg}" + return result + if not transcription_data: + result["status"] = "failed" + result["error"] = "Transcription timed out" + return result + if status_text: + status_text.text(f"[{current}/{total}] Processing segments...") + segments = process_transcription_to_segments(transcription_data, video_id) + result["segments_count"] = len(segments) + save_segments_to_blob(video_id, segments) + try: + index_result = index_segments_direct( + video_id, + segments, + source_url=url, + source_type=source_type + ) + result["url_stored"] = index_result.get('source_url_stored', False) + result["index_status"] = f"Indexed {index_result.get('indexed', 0)} documents" + st.session_state['debug_info'][video_id] = { + 'url_fields_available': index_result.get('url_fields_available', {}), + 'source_url_stored': index_result.get('source_url_stored', False), + 'source_type_stored': index_result.get('source_type_stored', False) + } + except Exception as e: + result["index_status"] = f"Indexing failed: {str(e)}" + result["status"] = "success" + except Exception as e: + result["status"] = "failed" + result["error"] = str(e) + import traceback + result["error"] += f"\n{traceback.format_exc()}" + return result \ No newline at end of file diff --git a/verify_fields.py b/verify_fields.py new file mode 100644 index 0000000..be7742b --- /dev/null +++ b/verify_fields.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +"""Verify URL fields were added to the index""" + +import os +import requests + +# Load .env +env_path = os.path.join(os.path.dirname(__file__), "ui", ".env") +env_vars = {} + +if os.path.exists(env_path): + with open(env_path) as f: + for line in f: + if '=' in line and not line.startswith('#'): + key, value = line.strip().split('=', 1) + env_vars[key] = value + +SEARCH_ENDPOINT = env_vars.get("SEARCH_ENDPOINT") +SEARCH_KEY = env_vars.get("SEARCH_KEY") +SEARCH_INDEX_NAME = env_vars.get("SEARCH_INDEX_NAME", "segments") + +API_VERSION = "2024-07-01" + +def check_index(): + url = f"{SEARCH_ENDPOINT}/indexes/{SEARCH_INDEX_NAME}?api-version={API_VERSION}" + headers = {"api-key": SEARCH_KEY} + + response = requests.get(url, headers=headers) + + if response.status_code == 200: + index = response.json() + fields = {f["name"]: f["type"] for f in index.get("fields", [])} + + print("✅ Successfully connected to index!") + print(f"\nTotal fields: {len(fields)}") + print(f"\nChecking URL tracking fields:") + + url_fields = { + "source_url": "Edm.String", + "source_type": "Edm.String", + "processed_at": "Edm.DateTimeOffset" + } + + all_present = True + for field, expected_type in url_fields.items(): + if field in fields: + print(f" ✅ {field}: {fields[field]}") + else: + print(f" ❌ {field}: MISSING") + all_present = False + + if all_present: + print("\n🎉 SUCCESS! All URL tracking fields are present!") + print("\nYou can now:") + print("1. Restart your Streamlit app") + print("2. Process new videos - URLs will be stored automatically") + else: + print("\n⚠️ Some fields are missing. Run the add script again.") + + return all_present + else: + print(f"❌ Failed to get index: {response.status_code}") + print(response.text) + return False + +if __name__ == "__main__": + check_index()