Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 67 additions & 7 deletions ai.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import time
import unicodedata

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_text_splitters import RecursiveCharacterTextSplitter

Expand Down Expand Up @@ -35,6 +35,57 @@ def _clamp_sentiment(value: int) -> int:
return value


def _sanitize_text_for_llm(text: str) -> str:
if not text:
return ""

sanitized_chars = []
changed = False
for char in text:
if char == "\x00" or unicodedata.category(char) == "Cs":
changed = True
continue
sanitized_chars.append(char)

sanitized = "".join(sanitized_chars)
if sanitized != text:
changed = True

# Ensure the payload can always be encoded as valid UTF-8 JSON.
utf8_sanitized = sanitized.encode("utf-8", errors="ignore").decode("utf-8")
if utf8_sanitized != sanitized:
changed = True

if changed:
logger.warning(
"AI pipeline: sanitized text for LLM (before_len=%d after_len=%d)",
len(text),
len(utf8_sanitized),
)

return utf8_sanitized


def _is_invalid_json_body_error(error: Exception) -> bool:
message = str(error or "").lower()
return "parse the json body" in message or "not valid json" in message


def _invoke_chain(chain, text: str, stage: str) -> str:
sanitized_text = _sanitize_text_for_llm(text)
try:
response = chain.invoke({"text": sanitized_text})
return (response.content or "").strip()
except Exception as error:
if not _is_invalid_json_body_error(error):
raise

logger.warning("AI pipeline: retrying %s after invalid JSON body error", stage)
retry_text = _sanitize_text_for_llm(sanitized_text)
response = chain.invoke({"text": retry_text})
return (response.content or "").strip()


def generate_clean_summary_sentiment(text: str):
"""Generate cleaned transcription, summary, and sentiment (0-10).

Expand Down Expand Up @@ -147,11 +198,11 @@ def generate_clean_summary_sentiment(text: str):
for idx, chunk in enumerate(chunks):
try:
logger.debug("AI pipeline: cleaning chunk %d/%d (len=%d)", idx + 1, len(chunks), len(chunk))
cleaned_chunks.append(clean_chain.invoke({"text": chunk}).content)
cleaned_chunks.append(_invoke_chain(clean_chain, chunk, f"clean chunk {idx + 1}/{len(chunks)}"))
except Exception:
logger.exception("AI pipeline: failed cleaning chunk %d/%d", idx + 1, len(chunks))
raise
cleaned = "\n\n".join([c.strip() for c in cleaned_chunks if c and c.strip()]).strip()
cleaned = _sanitize_text_for_llm("\n\n".join([c.strip() for c in cleaned_chunks if c and c.strip()]).strip())
logger.debug("AI pipeline: cleaned_len=%d", len(cleaned))

summarize_chunk_prompt = ChatPromptTemplate.from_messages(
Expand Down Expand Up @@ -189,7 +240,13 @@ def generate_clean_summary_sentiment(text: str):
for idx, chunk in enumerate(summarize_chunks):
try:
logger.debug("AI pipeline: summarizing chunk %d/%d (len=%d)", idx + 1, len(summarize_chunks), len(chunk))
chunk_summaries.append(summarize_chunk_chain.invoke({"text": chunk}).content)
chunk_summaries.append(
_invoke_chain(
summarize_chunk_chain,
chunk,
f"summarize chunk {idx + 1}/{len(summarize_chunks)}",
)
)
except Exception:
logger.exception("AI pipeline: failed summarizing chunk %d/%d", idx + 1, len(summarize_chunks))
raise
Expand Down Expand Up @@ -219,11 +276,14 @@ def generate_clean_summary_sentiment(text: str):
)
reduce_chain = reduce_prompt | llm
try:
summary = reduce_chain.invoke({"text": "\n\n".join([s.strip() for s in chunk_summaries if s and s.strip()])}).content
summary = _invoke_chain(
reduce_chain,
"\n\n".join([s.strip() for s in chunk_summaries if s and s.strip()]),
"reduce summaries",
)
except Exception:
logger.exception("AI pipeline: failed reducing chunk summaries")
raise
summary = (summary or "").strip()
logger.debug("AI pipeline: summary_len=%d", len(summary))

sentiment_prompt = ChatPromptTemplate.from_messages(
Expand Down Expand Up @@ -255,7 +315,7 @@ def generate_clean_summary_sentiment(text: str):
sentiment_chain = sentiment_prompt | llm

try:
sentiment_text = sentiment_chain.invoke({"text": cleaned[:20000]}).content
sentiment_text = _invoke_chain(sentiment_chain, cleaned[:20000], "sentiment scoring")
except Exception:
logger.exception("AI pipeline: failed sentiment scoring")
raise
Expand Down
8 changes: 8 additions & 0 deletions asterisk_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async def _handle_stasis_start(self, event):
or self.channels[channel_id]['linkedid'] in self.pending_transcription_requests
)
self.channels[channel_id]['connector_started'] = False
self.channels[channel_id]['connector_starting'] = False
self.pending_transcription_requests.discard(channel_id)
logger.debug(f"Channel {channel_id} entered Satellite. Details: {channel}")
# Create a snoop channel for in and one for out
Expand Down Expand Up @@ -386,11 +387,14 @@ async def _start_connector(self, channel_id):
channel = self.channels[channel_id]
if channel.get('connector_started'):
return
if channel.get('connector_starting'):
return

if 'rtp_stream_in' not in channel or 'rtp_stream_out' not in channel:
logger.info(f"Transcription requested for {channel_id} but RTP streams are not ready yet")
return

channel['connector_starting'] = True
if channel.get('call_elapsed_at_start') is None:
channel['call_elapsed_at_start'] = await self._get_answered_elapsed_seconds(channel_id)

Expand All @@ -399,8 +403,11 @@ async def _start_connector(self, channel_id):

await channel['connector'].start()
channel['connector_started'] = True
channel['connector_starting'] = False
logger.info(f"Deepgram connector started for channel {channel_id}")
except Exception as e:
if channel_id in self.channels:
self.channels[channel_id]['connector_starting'] = False
logger.error(f"Failed to start Deepgram connector for channel {channel_id}: {e}")
# Close the channel if connector fails to start
if channel_id in self.channels:
Expand Down Expand Up @@ -444,6 +451,7 @@ async def stop_transcription(self, call_id):
except Exception as e:
logger.debug(f"Failed to close connector for channel {channel_id}: {e}")
channel['connector_started'] = False
channel['connector_starting'] = False

async def _handle_stasis_end(self, event):
"""Handle channel hangup event"""
Expand Down
56 changes: 43 additions & 13 deletions deepgram_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, deepgram_api_key, rtp_stream_in, rtp_stream_out, mqtt_client,
self.dg_connection = None
self.loop = None
self.complete_call = []
self.latest_interim_by_channel = {}
self._close_started = False
self._close_lock = asyncio.Lock()
self._first_transcript_logged = False
Expand Down Expand Up @@ -131,41 +132,50 @@ async def on_message(self, client, result, **kwargs):
speaker_counterpart_name = self.speaker_name_in
speaker_counterpart_number = self.speaker_number_in
try:
channel_index = result.channel_index[0]
segment_start = float(result.start)
await self.mqtt_client.publish(
topic='transcription',
payload=json.dumps({
"uniqueid": self.uniqueid,
"transcription": transcription,
"timestamp": timestamp,
"channel_index": channel_index,
"segment_start": segment_start,
"speaker_name": speaker_name,
"speaker_number": speaker_number,
"speaker_counterpart_name": speaker_counterpart_name,
"speaker_counterpart_number": speaker_counterpart_number,
"is_final": result.is_final,
})
)
message = {
"uniqueid": self.uniqueid,
"transcription": transcription,
"timestamp": timestamp,
"channel_index": channel_index,
"segment_start": segment_start,
"speaker_name": speaker_name,
"speaker_number": speaker_number,
"speaker_counterpart_name": speaker_counterpart_name,
"speaker_counterpart_number": speaker_counterpart_number,
"is_final": result.is_final,
}
self.latest_interim_by_channel[channel_index] = message
# save the transcription to the complete_call if it is final
if result.is_final:
self.complete_call.append({
"uniqueid": self.uniqueid,
"transcription": transcription,
"timestamp": timestamp,
"speaker_name": speaker_name,
"speaker_number": speaker_number,
"speaker_counterpart_name": speaker_counterpart_name,
"speaker_counterpart_number": speaker_counterpart_number,
"is_final": result.is_final,
})
self.complete_call.append(message)
self.latest_interim_by_channel.pop(channel_index, None)
except Exception as e:
logger.error(f"Failed to schedule transcription publishing: {e}")

def on_metadata(self, client, metadata, **kwargs):
async def on_metadata(self, client, metadata, **kwargs):
"""
Handle metadata events
"""
return

def on_error(self, client, error, **kwargs):
async def on_error(self, client, error, **kwargs):
"""
Handle error events
"""
Expand Down Expand Up @@ -287,15 +297,35 @@ async def close(self):
await socket.close()
except Exception as e:
logger.debug(f"Deepgram socket close failed for {self.uniqueid}: {e}")

# Give the SDK a short window to emit final transcript callbacks after finalize().
await asyncio.sleep(0.5)

# publish full conversation to mqtt
messages = list(self.complete_call)
if not messages and self.latest_interim_by_channel:
messages = sorted(
self.latest_interim_by_channel.values(),
key=lambda item: item.get("timestamp", 0),
)
logger.info(
"Publishing fallback final transcript for %s using %d interim segment(s)",
self.uniqueid,
len(messages),
)

text = ""
last_speaker = None
for message in self.complete_call:
for message in messages:
if last_speaker != message["speaker_name"]:
text += f'\n{message["speaker_name"]}: '
text += f'{message["transcription"]}\n'
last_speaker = message["speaker_name"]

if not text.strip():
logger.warning("No transcript content available to publish for %s", self.uniqueid)
return

# publish the full conversation to mqtt
await self.mqtt_client.publish(
topic='final',
Expand Down
40 changes: 40 additions & 0 deletions tests/test_ai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from types import SimpleNamespace

import ai


class _FakeChain:
def __init__(self, responses):
self._responses = list(responses)
self.calls = []

def invoke(self, payload):
self.calls.append(payload)
response = self._responses.pop(0)
if isinstance(response, Exception):
raise response
return SimpleNamespace(content=response)


def test_sanitize_text_for_llm_removes_nulls_and_surrogates():
raw = "hello\x00world\ud800!"

sanitized = ai._sanitize_text_for_llm(raw)

assert sanitized == "helloworld!"


def test_invoke_chain_retries_after_invalid_json_body_error():
chain = _FakeChain(
[
Exception("Error code: 400 - We could not parse the JSON body of your request."),
"final summary",
]
)

content = ai._invoke_chain(chain, "hello\x00world", "reduce summaries")

assert content == "final summary"
assert len(chain.calls) == 2
assert chain.calls[0]["text"] == "helloworld"
assert chain.calls[1]["text"] == "helloworld"
42 changes: 42 additions & 0 deletions tests/test_asterisk_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio
from unittest.mock import AsyncMock

import pytest

from asterisk_bridge import AsteriskBridge


@pytest.mark.asyncio
async def test_start_connector_ignores_concurrent_duplicate_start():
bridge = AsteriskBridge(
url="http://ari.local",
app="satellite",
username="user",
password="pass",
mqtt_client=AsyncMock(),
rtp_server=AsyncMock(),
)

start_gate = asyncio.Event()
connector = AsyncMock()
connector.start = AsyncMock(side_effect=lambda: start_gate.wait())

bridge.channels["chan-1"] = {
"connector_started": False,
"connector_starting": False,
"rtp_stream_in": object(),
"rtp_stream_out": object(),
"call_elapsed_at_start": 0,
"connector": connector,
}

first = asyncio.create_task(bridge._start_connector("chan-1"))
await asyncio.sleep(0)
second = asyncio.create_task(bridge._start_connector("chan-1"))
await asyncio.sleep(0)

start_gate.set()
await asyncio.gather(first, second)

connector.start.assert_awaited_once()
assert bridge.channels["chan-1"]["connector_started"] is True
Loading
Loading