Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- History records are real (written to `results/`), retranscribe works too

## Architecture
- FastAPI app with SSE-based transcription pipeline: download → chunk → transcription API → save
- FastAPI app with SSE-based transcription pipeline: download → chunk → transcription API → save (with chunk cache for resume)
- History stored as markdown files with YAML frontmatter in `results/` — no database
- AI summarization via Chat Completions API (OpenAI or Gemini) — summaries stored as sidecar files `results/{record_id}_summary.md`
- Frontend is vanilla HTML/CSS/JS in `app/static/` — no build step
Expand All @@ -35,13 +35,18 @@
- Record IDs: 8-char hex from `uuid4().hex[:8]`, validated via regex before any file op
- `_resolve_path()` in `history.py`: validates ID format + glob lookup + path traversal guard
- SSE generators in `api.py`: yield progress events, handle client disconnect, `finally` marks interrupted records as failed
- SSE progress events include `chunk`, `chunks_total`, and `eta_seconds` fields during transcription (ETA from rolling average of per-chunk times)
- `complete_record()` / `fail_record()` rebuild the full YAML meta dict from parsed record — always include all fields
- `_write_md()` uses `sort_keys=False` to preserve frontmatter key order
- Audio cached in `results/{record_id}.mp3` after download — reused by retranscribe and same-URL re-transcriptions, deleted with record
- `find_cached_audio_by_url()` in `history.py`: scans existing records to skip re-download when the same URL is transcribed again
- Summary sidecar: `results/{record_id}_summary.md` with YAML frontmatter (prompt, created_at) + body
- `delete_record()` cascades to delete summary sidecar + audio cache
- `delete_record()` cascades to delete summary sidecar, audio cache, and chunk cache
- Video title prepended as first line of transcript and summary body in `api.py` (all codepaths: transcribe, demo, retranscribe, summarize, demo summarize)
- Chunk cache sidecar: `results/{record_id}_chunks.json` — stores completed chunk transcriptions for resume after interruption
- `_chunk_cache_key()` hashes `model|diarize|total` (SHA256, 16-char prefix) — cache invalidated when any parameter changes
- `load_chunk_cache()` / `save_chunk_cache()` / `delete_chunk_cache()` in `history.py` manage the chunk cache lifecycle
- Chunk cache saved after each chunk completes; deleted on successful transcription completion

## Transcription Models
- `gpt-4o-transcribe` (default) — OpenAI Whisper, plain text output
Expand All @@ -51,6 +56,7 @@
- Model resolved once at request time via `resolve_model(model, diarize)` in `api.py` — same resolved values used for both storage (`get_stored_model`) and execution (`transcribe_chunk`)
- `resolve_model()` extracts base model from `-diarize` suffix for backward compat (e.g. `"gemini-2.0-flash-diarize"` → `("gemini-2.0-flash", True)`)
- Shared Gemini helpers (`is_gemini_model`, `get_client`) live in `app/clients.py` — used by both transcriber and summarizer
- Gemini retry logic: `MAX_GEMINI_RETRIES = 3` with exponential backoff (`2^attempt` seconds) on empty content — raises `RuntimeError` after all retries exhausted
- `duration_limit` API field is in minutes; converted to seconds (`* 60`) in the handler before storage
- `duration_limit` validation: `0 <= v <= 480` (0 = no limit, max = 8 hours)

Expand All @@ -61,6 +67,11 @@
- No classes for data — records are plain dicts
- YAML frontmatter fields: title, url, status, duration, duration_limit, model, words (on complete), created_at, error (in this order)

## Logging
- Custom formatter configured in `app/main.py` (not `run.py`); `run.py` passes `log_config=None` to uvicorn
- Applies to `app`, `uvicorn`, `uvicorn.error`, `uvicorn.access` loggers — propagation disabled
- Startup log shows enabled providers and active models

## Testing Conventions
- `tmp_results` fixture monkeypatches `history.RESULTS_DIR` to a temp dir
- Test classes group related tests (e.g. `TestLifecycle`, `TestEdgeCases`)
Expand Down Expand Up @@ -116,3 +127,4 @@
- Old records may lack newer frontmatter fields — always use `.get("field", "")` with defaults
- `get_history()` strips `body` and `path` from returned dicts (metadata only)
- Temp files use UUID suffixes for isolation — cleanup uses glob patterns to find chunks
- Chunk cache JSON (`_chunks.json`) files are safely ignored by history glob (same pattern isolation as summary sidecars)
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
- **Multi-provider support** — OpenAI + Google Gemini (via OpenAI-compatible endpoint, zero extra deps)
- **Provider selector** — toggle between providers in the UI, persisted to localStorage
- **Duration limit** — transcribe only the first N minutes of a video
- **Audio caching** — re-transcriptions and same-URL transcriptions skip re-download
- **Audio & chunk caching** — re-transcriptions skip re-download; interrupted multi-chunk transcriptions resume from where they left off
- **Re-transcribe** — re-run any completed or failed transcription, optionally switching models
- **Expandable history** — click any completed transcript to preview the text inline
- **Real-time progress** streamed to the browser via Server-Sent Events
- **Real-time progress with ETA** streamed to the browser via Server-Sent Events, with per-chunk timing and estimated time remaining
- **Cancel** an in-progress transcription from the UI
- **History** with status tracking — persists across page refreshes and server restarts
- **Show in Finder** — reveal any saved transcript file on disk
Expand Down Expand Up @@ -114,7 +114,7 @@ transcript-maker/
├── .gitignore
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app factory + static mount
│ ├── main.py # FastAPI app factory, logging setup, startup log
│ ├── config.py # pydantic-settings (env vars)
│ ├── api.py # API routes (transcribe + history endpoints)
│ ├── clients.py # Shared OpenAI/Gemini client helpers
Expand Down Expand Up @@ -182,7 +182,7 @@ Accepted YouTube hostnames: `youtube.com`, `www.youtube.com`, `m.youtube.com`, `

| Event | Payload | When |
|---|---|---|
| `progress` | `{"stage": "...", "message": "...", "record_id": "..."}` | Each pipeline stage |
| `progress` | `{"stage": "...", "message": "...", "record_id": "...", "chunk": N, "chunks_total": N, "eta_seconds": N}` | Each pipeline stage (chunk/eta fields during transcription) |
| `transcript` | `{"text": "...", "title": "...", "duration_seconds": N, "duration_limit": N, "model": "...", "record_id": "..."}` | Transcription complete |
| `error` | `{"message": "...", "record_id": "..."}` | On failure |
| `done` | `{}` | Stream finished |
Expand Down Expand Up @@ -302,11 +302,11 @@ Records are sorted newest-first by `created_at`.
4. **Create record** — write `.md` file with `status: in_progress` and selected model
5. **Truncate** — if `duration_limit` is set, ffmpeg trims audio to the specified length
6. **Chunk** — ffmpeg splits audio into segments under 24 MB (if needed)
7. **Transcribe** — send each chunk to OpenAI Whisper or Gemini API sequentially (using selected model)
7. **Transcribe** — send each chunk to OpenAI Whisper or Gemini API sequentially; completed chunks are cached so interrupted transcriptions can resume
8. **Complete** — update `.md` to `status: done`, write transcript as body
9. **Cleanup** — delete temporary audio files

On error at any step, the record is updated to `status: error`. On client disconnect, the record stays `in_progress` (no partial saves).
On error at any step, the record is updated to `status: error`. On client disconnect, the record stays `in_progress` (no partial saves). Completed chunks are cached in `{record_id}_chunks.json` — on the next attempt, already-transcribed chunks are skipped.

## History & Persistence

Expand Down Expand Up @@ -337,6 +337,8 @@ Video title is prepended as the first line of both the transcript body and summa

**Audio cache:** `{record_id}.mp3` — cached audio file, reused by retranscribe. Deleted automatically when the parent record is deleted.

**Chunk cache:** `{record_id}_chunks.json` — stores completed chunk transcriptions as JSON for resume. Cache key is a SHA256 hash of model + diarize + chunk count; invalidated when any parameter changes. Deleted on successful completion or when the parent record is deleted.

**Status lifecycle:** `in_progress` → `done` | `error`

On server startup, any leftover `in_progress` records (from a prior crash) are automatically marked as `error`.
Expand Down
61 changes: 53 additions & 8 deletions app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
get_record, get_record_status, reset_record,
save_audio, get_audio_path, find_cached_audio_by_url, RESULTS_DIR,
save_summary, get_summary,
load_chunk_cache, save_chunk_cache, delete_chunk_cache,
)
from app.transcriber import prepare_chunks, transcribe_chunk, cleanup_temp_files, get_stored_model, resolve_model
from app.summarizer import summarize_text
Expand Down Expand Up @@ -96,15 +97,19 @@ async def _demo_event_generator(url: str, request: Request, record_id: str | Non
await asyncio.sleep(1.0)

if num_chunks > 1:
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Audio split into {num_chunks} chunks", "record_id": record_id})}
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Audio split into {num_chunks} chunks", "record_id": record_id, "chunk": 0, "chunks_total": num_chunks})}

# Simulate transcription (10s total, split across chunks)
steps_per_chunk = max(1, int(DEMO_TRANSCRIBE_SECONDS / DEMO_TICK / num_chunks))
time_per_chunk = steps_per_chunk * DEMO_TICK
for chunk_i in range(num_chunks):
if await request.is_disconnected():
return
chunk_label = f" chunk {chunk_i + 1} of {num_chunks}" if num_chunks > 1 else ""
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Transcribing{chunk_label}...", "record_id": record_id})}
progress_data: dict = {"stage": "transcribing", "message": f"Transcribing{chunk_label}...", "record_id": record_id, "chunk": chunk_i + 1, "chunks_total": num_chunks}
if chunk_i >= 1:
progress_data["eta_seconds"] = round(time_per_chunk * (num_chunks - chunk_i), 1)
yield {"event": "progress", "data": json.dumps(progress_data)}
for _ in range(steps_per_chunk):
if await request.is_disconnected():
return
Expand Down Expand Up @@ -325,15 +330,34 @@ async def event_generator():
if len(chunks) > 1:
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Audio split into {len(chunks)} chunks", "record_id": record_id})}

# Transcribe
transcript_parts = []
# Transcribe (with chunk cache for resume)
cached_parts = load_chunk_cache(record_id, actual_model, diarize_flag, len(chunks))
transcript_parts = list(cached_parts)
chunk_times: list[float] = []
for i, chunk_path in enumerate(chunks):
if i < len(cached_parts):
logger.info("Chunk %d/%d cached, skipping", i + 1, len(chunks))
if len(chunks) > 1:
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Chunk {i+1} of {len(chunks)} (cached)", "record_id": record_id, "chunk": i + 1, "chunks_total": len(chunks)})}
continue
if await request.is_disconnected():
logger.warning("Client disconnected during transcription")
break
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Transcribing{f' chunk {i+1} of {len(chunks)}' if len(chunks) > 1 else ''}...", "record_id": record_id})}
progress_data: dict = {"stage": "transcribing", "message": f"Transcribing{f' chunk {i+1} of {len(chunks)}' if len(chunks) > 1 else ''}...", "record_id": record_id, "chunk": i + 1, "chunks_total": len(chunks)}
if chunk_times and len(chunks) > 1:
avg = sum(chunk_times) / len(chunk_times)
remaining = len(chunks) - i
progress_data["eta_seconds"] = round(avg * remaining)
yield {"event": "progress", "data": json.dumps(progress_data)}
t0 = time.monotonic()
text = await transcribe_chunk(chunk_path, model=actual_model, diarize=diarize_flag)
elapsed = time.monotonic() - t0
chunk_times.append(elapsed)
if len(chunks) > 1:
avg = sum(chunk_times) / len(chunk_times)
logger.info("Chunk %d/%d took %.1fs (avg %.1fs/chunk)", i + 1, len(chunks), elapsed, avg)
transcript_parts.append(text)
save_chunk_cache(record_id, actual_model, diarize_flag, len(chunks), transcript_parts)

# Guard: don't save partial transcript if client disconnected
if await request.is_disconnected():
Expand All @@ -343,6 +367,7 @@ async def event_generator():
full_text = f"{title}\n\n{' '.join(transcript_parts)}"
if not complete_record(record_id, full_text):
logger.warning("Transcription succeeded but history write failed for %s", record_id)
delete_chunk_cache(record_id)
logger.info("Transcription done: %s", record_id)
yield {"event": "transcript", "data": json.dumps({"text": full_text, "duration_seconds": duration, "duration_limit": limit_sec, "title": title, "record_id": record_id, "model": stored_model})}
yield {"event": "done", "data": "{}"}
Expand Down Expand Up @@ -440,15 +465,34 @@ async def event_generator():
if len(chunks) > 1:
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Audio split into {len(chunks)} chunks", "record_id": record_id})}

# Transcribe
transcript_parts = []
# Transcribe (with chunk cache for resume)
cached_parts = load_chunk_cache(record_id, actual_model, diarize_flag, len(chunks))
transcript_parts = list(cached_parts)
chunk_times: list[float] = []
for i, chunk_path in enumerate(chunks):
if i < len(cached_parts):
logger.info("Chunk %d/%d cached, skipping", i + 1, len(chunks))
if len(chunks) > 1:
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Chunk {i+1} of {len(chunks)} (cached)", "record_id": record_id, "chunk": i + 1, "chunks_total": len(chunks)})}
continue
if await request.is_disconnected():
logger.warning("Client disconnected during transcription")
break
yield {"event": "progress", "data": json.dumps({"stage": "transcribing", "message": f"Transcribing{f' chunk {i+1} of {len(chunks)}' if len(chunks) > 1 else ''}...", "record_id": record_id})}
progress_data: dict = {"stage": "transcribing", "message": f"Transcribing{f' chunk {i+1} of {len(chunks)}' if len(chunks) > 1 else ''}...", "record_id": record_id, "chunk": i + 1, "chunks_total": len(chunks)}
if chunk_times and len(chunks) > 1:
avg = sum(chunk_times) / len(chunk_times)
remaining = len(chunks) - i
progress_data["eta_seconds"] = round(avg * remaining)
yield {"event": "progress", "data": json.dumps(progress_data)}
t0 = time.monotonic()
text = await transcribe_chunk(chunk_path, model=actual_model, diarize=diarize_flag)
elapsed = time.monotonic() - t0
chunk_times.append(elapsed)
if len(chunks) > 1:
avg = sum(chunk_times) / len(chunk_times)
logger.info("Chunk %d/%d took %.1fs (avg %.1fs/chunk)", i + 1, len(chunks), elapsed, avg)
transcript_parts.append(text)
save_chunk_cache(record_id, actual_model, diarize_flag, len(chunks), transcript_parts)

# Guard: don't save partial transcript if client disconnected
if await request.is_disconnected():
Expand All @@ -458,6 +502,7 @@ async def event_generator():
full_text = f"{record['title']}\n\n{' '.join(transcript_parts)}"
if not complete_record(record_id, full_text):
logger.warning("Retranscription succeeded but history write failed for %s", record_id)
delete_chunk_cache(record_id)
logger.info("Retranscription done: %s", record_id)
yield {"event": "transcript", "data": json.dumps({"text": full_text, "duration_seconds": record["duration"], "duration_limit": limit_sec, "title": record["title"], "record_id": record_id, "model": stored_model})}
yield {"event": "done", "data": "{}"}
Expand Down
48 changes: 48 additions & 0 deletions app/history.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import hashlib
import json
import logging
import re
import shutil
Expand Down Expand Up @@ -266,6 +268,7 @@ def delete_record(record_id: str) -> bool:
return False
audio = get_audio_path(record_id)
delete_summary(record_id)
delete_chunk_cache(record_id)
path.unlink(missing_ok=True)
if audio:
audio.unlink(missing_ok=True)
Expand Down Expand Up @@ -330,6 +333,51 @@ def delete_summary(record_id: str) -> None:
_summary_path(record_id).unlink(missing_ok=True)


def _chunk_cache_path(record_id: str) -> Path:
"""Return the path for a record's chunk cache sidecar file."""
return RESULTS_DIR / f"{record_id}_chunks.json"


def _chunk_cache_key(model: str, diarize: bool, total: int) -> str:
"""Hash of parameters that invalidate the chunk cache."""
raw = f"{model}|{diarize}|{total}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]


def load_chunk_cache(record_id: str, model: str, diarize: bool, total: int) -> list[str]:
"""Load cached chunk transcriptions. Returns list of completed parts (may be shorter than total).
Returns empty list if cache missing or invalidated (model/diarize/total mismatch)."""
path = _chunk_cache_path(record_id)
if not path.exists():
return []
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
logger.warning("Could not read chunk cache for %s", record_id)
return []
if data.get("cache_key") != _chunk_cache_key(model, diarize, total):
logger.info("Chunk cache invalidated for %s (parameter mismatch)", record_id)
return []
parts = data.get("parts", [])
if not isinstance(parts, list):
return []
return parts


def save_chunk_cache(record_id: str, model: str, diarize: bool, total: int, parts: list[str]) -> None:
"""Persist current chunk transcription progress."""
RESULTS_DIR.mkdir(exist_ok=True)
data = {"cache_key": _chunk_cache_key(model, diarize, total), "parts": parts}
_chunk_cache_path(record_id).write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")


def delete_chunk_cache(record_id: str) -> None:
"""Remove chunk cache sidecar file."""
if not re.fullmatch(r"[0-9a-f]{8}", record_id):
return
_chunk_cache_path(record_id).unlink(missing_ok=True)


def cleanup_stale_records() -> None:
"""Mark any in_progress records as failed (stale from prior crash)."""
RESULTS_DIR.mkdir(exist_ok=True)
Expand Down
Loading