Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
663ee63
feat(transcription): introduce multi-provider abstraction layer
Amygos Feb 13, 2026
c99950d
feat(transcription): extract Deepgram provider implementation
Amygos Feb 13, 2026
c00f3a4
refactor(api): use transcription provider abstraction
Amygos Feb 13, 2026
bd702d6
test(transcription): add comprehensive provider unit tests
Amygos Feb 13, 2026
edd5bc2
docs(readme): document multi-provider configuration
Amygos Feb 13, 2026
b4cae58
docs: update architecture documentation for multiple providers
Amygos Feb 13, 2026
ed7b4d8
build(docker): add Mistral API key configuration
Amygos Feb 13, 2026
9ae7a1a
fix(docker): include transcription package in container image
Amygos Feb 13, 2026
3558f24
fix(transcription): enable VoXtral diarization by default
Amygos Feb 16, 2026
0209e32
test(transcription): verify VoXtral diarization default behavior
Amygos Feb 16, 2026
1b43d0d
fix(transcription): use correct speaker_id field from VoXtral API
Amygos Feb 16, 2026
0d7edfd
fixup! refactor(api): use transcription provider abstraction
Amygos Mar 16, 2026
80be3b2
fixup! feat(transcription): introduce multi-provider abstraction layer
Amygos Mar 16, 2026
c45a0f0
fixup! feat(transcription): extract Deepgram provider implementation
Amygos Mar 16, 2026
bb8a90d
fixup! feat(transcription): introduce multi-provider abstraction layer
Amygos Mar 16, 2026
2fe7c18
fixup! feat(transcription): extract Deepgram provider implementation
Amygos Mar 16, 2026
19ff749
fixup! feat(transcription): introduce multi-provider abstraction layer
Amygos Mar 16, 2026
84e07e2
fixup! fix(transcription): use correct speaker_id field from VoXtral API
Amygos Mar 16, 2026
6d28726
fixup! fix(transcription): use correct speaker_id field from VoXtral API
Amygos Mar 16, 2026
96b07a6
fixup! refactor(api): use transcription provider abstraction
Amygos Mar 16, 2026
41a1d54
fixup! refactor(api): use transcription provider abstraction
Amygos Mar 16, 2026
a74333c
fixup! build(docker): add Mistral API key configuration
Amygos Mar 16, 2026
5716fa2
fixup! docs: update architecture documentation for multiple providers
Amygos Mar 16, 2026
034d83c
fixup! docs(readme): document multi-provider configuration
Amygos Mar 16, 2026
7b1f896
fixup! docs(readme): document multi-provider configuration
Amygos Mar 16, 2026
a7a85e0
fixup! test(transcription): add comprehensive provider unit tests
Amygos Mar 16, 2026
513f84d
fixup! test(transcription): verify VoXtral diarization default behavior
Amygos Mar 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copilot Instructions — Satellite

## Build & Test

```bash
# Install
pip install -r requirements.txt
pip install -r requirements-dev.txt # test deps (pytest, httpx, etc.)

# Run all tests (with coverage)
pytest

# Run a single test file / single test
pytest tests/test_api.py
pytest tests/test_api.py::TestGetTranscription::test_valid_wav_file

# Run the app
python main.py
```

Python 3.12+. No linter configured in CI — only `pytest` runs in the build workflow.
Container image uses `Containerfile` (multi-stage, python:slim base).

## Architecture

Satellite bridges Asterisk PBX ↔ transcription providers (Deepgram or VoxTral), publishing results over MQTT.

### Runtime components (all in one process)

| Module | Role |
|---|---|
| `main.py` | Entrypoint — starts the asyncio event loop for the real-time pipeline and a background thread running the FastAPI/Uvicorn HTTP server |
| `asterisk_bridge.py` | ARI WebSocket client — listens for Stasis events, creates snoop channels + external media, manages per-call lifecycle |
| `rtp_server.py` | UDP server — receives RTP audio, strips headers, routes packets to per-channel async queues by source port |
| `deepgram_connector.py` | Streams audio to Deepgram via WebSocket — interleaves two RTP channels into stereo for multichannel transcription; aggregates final transcript on hangup (real-time path only, Deepgram-only for now) |
| `mqtt_client.py` | Publishes interim/final transcription JSON to MQTT topics (`{prefix}/transcription`, `{prefix}/final`) |
| `transcription/` | **Provider abstraction** — `base.py` defines interface; `deepgram.py` and `voxtral.py` implement REST API clients; `__init__.py` factory selects provider via env var or per-request override |
| `api.py` | FastAPI app — `POST /api/get_transcription` accepts WAV uploads, calls transcription provider REST API, optionally persists to Postgres |
| `call_processor.py` | **Runs as a subprocess** (invoked from api.py via `subprocess.run`) — reads JSON from stdin, calls AI enrichment, writes results to DB |
| `ai.py` | LangChain + OpenAI — cleans transcript, generates summary + sentiment score (0-10) |
| `db.py` | PostgreSQL + pgvector — schema auto-init with threading lock; stores transcripts, state machine (`progress` → `summarizing` → `done` / `failed`), and text-embedding-3-small chunks |

### Key data flows

1. **Real-time path:** Asterisk → ARI WebSocket → snoop channel → RTP → `rtp_server` → `deepgram_connector` (stereo WebSocket stream) → Deepgram → `mqtt_client` (Deepgram-only for now)
2. **REST/batch path:** WAV upload → `api.py` → `transcription/<provider>` REST API (Deepgram or VoxTral) → (optionally) `db.py` persist → (optionally) `call_processor.py` subprocess → `ai.py` → `db.py` update

### Non-obvious details

- Two RTP streams per call (one per direction) are interleaved into a single stereo buffer for Deepgram's multichannel mode (real-time path only).
- `asterisk_bridge` detects if Asterisk swapped the RTP source ports and adjusts speaker labels accordingly.
- `call_processor` is deliberately a **subprocess** (not async task) — isolates OpenAI calls with independent timeout/logging, avoids blocking the event loop.
- DB schema initialization is guarded by a **threading lock** (not asyncio lock) because `psycopg` sync connections are used alongside the async FastAPI server.
- **Multi-provider support:** REST/batch path supports Deepgram and VoxTral. Select provider via `TRANSCRIPTION_PROVIDER` env var (default: `deepgram`) or per-request `provider=` parameter. Real-time path remains Deepgram-only.

## Conventions

- **Config:** Exclusively via environment variables (loaded from `.env` by `python-dotenv`). No config files or CLI args.
- **Logging:** One logger per module (`logging.getLogger(__name__)`), level controlled by `LOG_LEVEL` env var.
- **Async:** `asyncio` throughout the real-time pipeline; `asyncio.Lock` for connector close logic, `asyncio.Queue` for RTP buffer routing. Reconnection uses exponential backoff.
- **Testing:** `pytest-asyncio` with `asyncio_mode = auto`. Tests monkeypatch env vars and mock external services (Deepgram, MQTT, psycopg). A conftest auto-fixture resets `db._schema_initialized` between tests.
- **Auth:** Optional static bearer token (`API_TOKEN` env var) for `/api/*` endpoints. Accepts `Authorization: Bearer <token>` or `X-API-Token: <token>`.
- **Validation:** `uniqueid` must match `\d+\.\d+` (Asterisk format).
4 changes: 4 additions & 0 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ COPY requirements.txt /tmp/requirements.txt
# Copy application files
COPY *.py /tmp/
COPY README.md /tmp/
COPY transcription /tmp/transcription

# Install dependencies
RUN pip install --no-cache-dir --no-warn-script-location --user -r /tmp/requirements.txt
Expand All @@ -37,6 +38,7 @@ COPY --from=builder /root/.local /root/.local
# Copy application files
COPY --from=builder /tmp/*.py /app/
COPY --from=builder /tmp/README.md /app/
COPY --from=builder /tmp/transcription /app/transcription

# Make sure scripts in .local are usable
ENV PATH=/root/.local/bin:$PATH
Expand All @@ -56,7 +58,9 @@ ENV ASTERISK_URL="http://127.0.0.1:8088" \
MQTT_USERNAME="satellite" \
SATELLITE_MQTT_PASSWORD="dummypassword" \
HTTP_PORT="8000" \
TRANSCRIPTION_PROVIDER="deepgram" \
DEEPGRAM_API_KEY="" \
MISTRAL_API_KEY="" \
LOG_LEVEL="INFO" \
PYTHONUNBUFFERED="1"

Expand Down
35 changes: 27 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ RTP_HEADER_SIZE=12
MQTT_URL=mqtt://127.0.0.1:1883
MQTT_TOPIC_PREFIX=satellite

# Deepgram API Key
# Transcription Provider (optional, default: deepgram)
# Options: deepgram, voxtral
TRANSCRIPTION_PROVIDER=deepgram

# Deepgram API Key (required for Deepgram provider)
DEEPGRAM_API_KEY=your_deepgram_api_key

# Mistral API Key (required for VoxTral provider)
MISTRAL_API_KEY=your_mistral_api_key

# REST API (optional)
HTTP_PORT=8000

Expand Down Expand Up @@ -92,8 +99,10 @@ PGVECTOR_DATABASE=satellite
- `MQTT_URL`: URL of the MQTT broker
- `MQTT_TOPIC_PREFIX`: Prefix for MQTT topics

#### Deepgram Configuration
- `DEEPGRAM_API_KEY`: Your Deepgram API key
#### Transcription Configuration
- `TRANSCRIPTION_PROVIDER`: Choose the transcription provider (`deepgram` or `voxtral`, default: `deepgram`)
- `DEEPGRAM_API_KEY`: Your Deepgram API key (required for Deepgram provider)
- `MISTRAL_API_KEY`: Your Mistral API key (required for VoxTral provider)

#### Rest API Configuration
- `HTTP_PORT`: Port for the HTTP server (default: 8000)
Expand Down Expand Up @@ -127,28 +136,38 @@ This requires the `vector` extension (pgvector) in your Postgres instance.

#### `POST /api/get_transcription`

Accepts a WAV upload and returns a Deepgram transcription.
Accepts a WAV upload and returns a transcription from the configured provider (Deepgram or VoxTral).

Request requirements:
- Content type: multipart form upload with a `file` field (`audio/wav` or `audio/x-wav`)
- Content type: multipart form upload with a `file` field (`audio/wav`, `audio/x-wav`, `audio/mpeg`, or `audio/mp3`)

Optional fields (query string or multipart form fields):
- `provider`: Override the transcription provider (`deepgram` or `voxtral`). If not set, uses `TRANSCRIPTION_PROVIDER` env var (default: `deepgram`)
- `uniqueid`: Asterisk-style uniqueid like `1234567890.1234` (required only when `persist=true`)
- `persist`: `true|false` (default `false`) — persist raw transcript to Postgres (requires `PGVECTOR_*` env vars)
- `summary`: `true|false` (default `false`) — run AI enrichment (requires `OPENAI_API_KEY` and also `persist=true` so there is a DB record to update)
- `channel0_name`, `channel1_name`: rename diarization labels in the returned transcript (replaces `Channel 0:` / `Channel 1:`)
- `channel0_name`, `channel1_name`: rename diarization labels in the returned transcript (replaces `Channel 0:` / `Channel 1:`, `Speaker 0:` / `Speaker 1:`, and their uppercase variants; both Deepgram and VoxTral output are normalized to this format)

Deepgram parameters:
- Most Deepgram `/v1/listen` parameters may be provided as query/form fields and are passed through to Deepgram.
Provider-specific parameters:
- **Deepgram**: Most Deepgram `/v1/listen` parameters may be provided as query/form fields (e.g., `model`, `language`, `diarize`, `punctuate`)
- **VoxTral**: Supports `model` (default: `voxtral-mini-latest`), `language`, `diarize`, `temperature`, `context_bias`, `timestamp_granularities`

Example:
```
# Using default provider (from TRANSCRIPTION_PROVIDER env var)
curl -X POST http://127.0.0.1:8000/api/get_transcription \
-H 'Authorization: Bearer YOUR_TOKEN' \
-F uniqueid=1234567890.1234 \
-F persist=true \
-F summary=true \
-F file=@call.wav;type=audio/wav

# Override provider to use VoxTral
curl -X POST http://127.0.0.1:8000/api/get_transcription \
-H 'Authorization: Bearer YOUR_TOKEN' \
-F provider=voxtral \
-F diarize=true \
-F file=@call.wav;type=audio/wav
```

Authentication:
Expand Down
166 changes: 47 additions & 119 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@


import db
from transcription import get_provider

app = FastAPI()
logger = logging.getLogger("api")
Expand Down Expand Up @@ -379,6 +380,7 @@ async def get_speech(request: Request):
headers=headers_out,
)


@api_router.post('/get_transcription')
async def get_transcription(
request: Request,
Expand Down Expand Up @@ -410,6 +412,7 @@ async def get_transcription(
uniqueid = (input_params.get("uniqueid") or "").strip()
channel0_name = (input_params.get("channel0_name") or "").strip()
channel1_name = (input_params.get("channel1_name") or "").strip()
provider_name = (input_params.get("provider") or "").strip().lower() or None
# Persist only when explicitly requested.
persist = (input_params.get("persist") or "false").lower() in ("1", "true", "yes")
summary = (input_params.get("summary") or "false").lower() in ("1", "true", "yes")
Expand All @@ -423,7 +426,7 @@ async def get_transcription(

transcript_id = None
if db.is_configured() and persist:
# Create/mark a DB row immediately so we can track state even if Deepgram fails.
# Create/mark a DB row immediately so we can track state even if transcription fails.
try:
transcript_id = await run_in_threadpool(
db.upsert_transcript_progress,
Expand All @@ -433,84 +436,27 @@ async def get_transcription(
logger.exception("Failed to initialize transcript row for state tracking")
raise HTTPException(status_code=500, detail="Failed to initialize transcript persistence")

# Valid Deepgram REST API parameters for /v1/listen endpoint
deepgram_params = {
"callback": "",
"callback_method": "",
"custom_topic": "",
"custom_topic_mode": "",
"custom_intent": "",
"custom_intent_mode": "",
"detect_entities": "",
"detect_language": "true",
"diarize": "",
"dictation": "",
"encoding": "",
"extra": "",
"filler_words": "",
"intents": "",
"keyterm": "",
"keywords": "",
"language": "",
"measurements": "",
"mip_opt_out": "", # Opts out requests from the Deepgram Model Improvement Program
"model": "nova-3",
"multichannel": "",
"numerals": "true",
"paragraphs": "true",
"profanity_filter": "",
"punctuate": "true",
"redact": "",
"replace": "",
"search": "",
"sentiment": "false",
"smart_format": "true",
"summarize": "",
"tag": "",
"topics": "",
"utterances": "",
"utt_split": "",
"version": "",
}

headers = {
"Authorization": f"Token {DEEPGRAM_API_KEY}",
"Content-Type": file.content_type
}

params = {}
for k, v in deepgram_params.items():
if k in input_params and input_params[k].strip():
params[k] = input_params[k]
elif v.strip():
params[k] = v

# Get transcription provider
try:
deepgram_timeout_seconds = 300.0
timeout = httpx.Timeout(
connect=10.0,
read=deepgram_timeout_seconds,
write=deepgram_timeout_seconds,
pool=10.0,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
"https://api.deepgram.com/v1/listen",
headers=headers,
params=params,
content=audio_bytes,
)
# Debug: log response meta and preview
provider = get_provider(provider_name)
except ValueError as e:
logger.error("Failed to get transcription provider: %s", str(e))
if transcript_id is not None:
try:
logger.debug(
"Deepgram response: status=%s content_type=%s body_preview=%s",
response.status_code,
response.headers.get("Content-Type"),
(response.text[:500] if response is not None and hasattr(response, "text") and response.text else ""),
)
await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed")
except Exception:
logger.debug("Failed to log Deepgram response preview")
response.raise_for_status()
logger.exception("Failed to update transcript state=failed")
raise HTTPException(status_code=400, detail=str(e))

# Call transcription provider
try:
result = await provider.transcribe(
audio_bytes=audio_bytes,
content_type=file.content_type,
params=input_params,
)
raw_transcription = result.raw_transcription
detected_language = result.detected_language
except httpx.HTTPStatusError as e:
if transcript_id is not None:
try:
Expand All @@ -520,72 +466,54 @@ async def get_transcription(
try:
status = e.response.status_code if e.response is not None else "unknown"
body_preview = e.response.text[:500] if e.response is not None and hasattr(e.response, "text") and e.response.text else ""
logger.error("Deepgram API error: status=%s body_preview=%s", status, body_preview)
logger.error("Transcription API error: status=%s body_preview=%s", status, body_preview)
except Exception:
logger.error("Deepgram API error (logging failed)")
raise HTTPException(status_code=e.response.status_code, detail=f"Deepgram API error: {e.response.text}")
logger.error("Transcription API error (logging failed)")
raise HTTPException(status_code=e.response.status_code, detail=f"Transcription API error: {e.response.text}")
except httpx.TimeoutException:
logger.warning("Deepgram request timed out (uniqueid=%s)", uniqueid)
logger.warning("Transcription request timed out (uniqueid=%s)", uniqueid)
if transcript_id is not None:
try:
await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed")
except Exception:
logger.exception("Failed to update transcript state=failed")
raise HTTPException(status_code=504, detail="Deepgram request timed out")
raise HTTPException(status_code=504, detail="Transcription request timed out")
except httpx.RequestError as e:
logger.error("Deepgram request failed (uniqueid=%s): %s", uniqueid, str(e))
logger.error("Transcription request failed (uniqueid=%s): %s", uniqueid, str(e))
if transcript_id is not None:
try:
await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed")
except Exception:
logger.exception("Failed to update transcript state=failed")
raise HTTPException(status_code=502, detail="Failed to reach Deepgram")
except Exception as e:
logger.exception("Unexpected error while calling Deepgram")
raise HTTPException(status_code=502, detail="Failed to reach transcription service")
except ValueError as e:
logger.error("Failed to parse transcription response: %s", str(e))
if transcript_id is not None:
try:
await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed")
except Exception:
logger.exception("Failed to update transcript state=failed")
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

result = response.json()
detected_language = None # always define; mocks may omit this field
try:
if "paragraphs" in result["results"] and "transcript" in result["results"]["paragraphs"]:
raw_transcription = result["results"]["paragraphs"]["transcript"].strip()
elif (
"channels" in result["results"]
and result["results"]["channels"]
and "alternatives" in result["results"]["channels"][0]
and result["results"]["channels"][0]["alternatives"]
and "paragraphs" in result["results"]["channels"][0]["alternatives"][0]
and "transcript" in result["results"]["channels"][0]["alternatives"][0]["paragraphs"]
):
raw_transcription = (
result["results"]["channels"][0]["alternatives"][0]["paragraphs"]["transcript"].strip()
)
else:
logger.debug("failed to get paragraphs transcript")
logger.debug(result)
raise KeyError("paragraphs transcript not found")
if "channels" in result["results"] and "detected_language" in result["results"]["channels"][0]:
detected_language = result["results"]["channels"][0]["detected_language"]
else:
logger.debug("failed to get detected_language")
logger.debug(result)
if channel0_name:
raw_transcription = raw_transcription.replace("Channel 0:", f"{channel0_name}:")
if channel1_name:
raw_transcription = raw_transcription.replace("Channel 1:", f"{channel1_name}:")
except (KeyError, IndexError):
logger.error("Failed to parse Deepgram transcription response: %s", response.text)
raise HTTPException(status_code=500, detail=str(e))
except Exception as e:
logger.exception("Unexpected error while calling transcription service")
if transcript_id is not None:
try:
await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed")
except Exception:
logger.exception("Failed to update transcript state=failed")
raise HTTPException(status_code=500, detail="Failed to parse transcription response.")
raise HTTPException(status_code=500, detail="Unexpected error while processing transcription")

# Apply channel name replacements (provider-agnostic post-processing)
if channel0_name:
raw_transcription = raw_transcription.replace("Channel 0:", f"{channel0_name}:")
raw_transcription = raw_transcription.replace("CHANNEL 0:", f"{channel0_name}:")
raw_transcription = raw_transcription.replace("Speaker 0:", f"{channel0_name}:")
raw_transcription = raw_transcription.replace("SPEAKER 0:", f"{channel0_name}:")
if channel1_name:
raw_transcription = raw_transcription.replace("Channel 1:", f"{channel1_name}:")
raw_transcription = raw_transcription.replace("CHANNEL 1:", f"{channel1_name}:")
raw_transcription = raw_transcription.replace("Speaker 1:", f"{channel1_name}:")
raw_transcription = raw_transcription.replace("SPEAKER 1:", f"{channel1_name}:")

Comment on lines +507 to 517
# Persist raw transcript when Postgres config is present (default) unless disabled per request.
if transcript_id is not None:
Expand Down
Loading
Loading