From 7702fa85a871eaac3a930ff8c370d74524fa1a66 Mon Sep 17 00:00:00 2001 From: Tommaso Ascani Date: Fri, 13 Mar 2026 19:05:33 +0100 Subject: [PATCH 1/3] feat(transcription): enhance transcription handling with interim messages and fallback logic --- deepgram_connector.py | 52 +++++++++++++++----- tests/test_deepgram_connector.py | 84 ++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 11 deletions(-) create mode 100644 tests/test_deepgram_connector.py diff --git a/deepgram_connector.py b/deepgram_connector.py index 844771b..82a4f52 100644 --- a/deepgram_connector.py +++ b/deepgram_connector.py @@ -51,6 +51,7 @@ def __init__(self, deepgram_api_key, rtp_stream_in, rtp_stream_out, mqtt_client, self.dg_connection = None self.loop = None self.complete_call = [] + self.latest_interim_by_channel = {} self._close_started = False self._close_lock = asyncio.Lock() self._first_transcript_logged = False @@ -131,12 +132,16 @@ async def on_message(self, client, result, **kwargs): speaker_counterpart_name = self.speaker_name_in speaker_counterpart_number = self.speaker_number_in try: + channel_index = result.channel_index[0] + segment_start = float(result.start) await self.mqtt_client.publish( topic='transcription', payload=json.dumps({ "uniqueid": self.uniqueid, "transcription": transcription, "timestamp": timestamp, + "channel_index": channel_index, + "segment_start": segment_start, "speaker_name": speaker_name, "speaker_number": speaker_number, "speaker_counterpart_name": speaker_counterpart_name, @@ -144,18 +149,23 @@ async def on_message(self, client, result, **kwargs): "is_final": result.is_final, }) ) + message = { + "uniqueid": self.uniqueid, + "transcription": transcription, + "timestamp": timestamp, + "channel_index": channel_index, + "segment_start": segment_start, + "speaker_name": speaker_name, + "speaker_number": speaker_number, + "speaker_counterpart_name": speaker_counterpart_name, + "speaker_counterpart_number": speaker_counterpart_number, + "is_final": result.is_final, + } + self.latest_interim_by_channel[channel_index] = message # save the transcription to the complete_call if it is final if result.is_final: - self.complete_call.append({ - "uniqueid": self.uniqueid, - "transcription": transcription, - "timestamp": timestamp, - "speaker_name": speaker_name, - "speaker_number": speaker_number, - "speaker_counterpart_name": speaker_counterpart_name, - "speaker_counterpart_number": speaker_counterpart_number, - "is_final": result.is_final, - }) + self.complete_call.append(message) + self.latest_interim_by_channel.pop(channel_index, None) except Exception as e: logger.error(f"Failed to schedule transcription publishing: {e}") @@ -287,15 +297,35 @@ async def close(self): await socket.close() except Exception as e: logger.debug(f"Deepgram socket close failed for {self.uniqueid}: {e}") + + # Give the SDK a short window to emit final transcript callbacks after finalize(). + await asyncio.sleep(0.5) + # publish full conversation to mqtt + messages = list(self.complete_call) + if not messages and self.latest_interim_by_channel: + messages = sorted( + self.latest_interim_by_channel.values(), + key=lambda item: item.get("timestamp", 0), + ) + logger.info( + "Publishing fallback final transcript for %s using %d interim segment(s)", + self.uniqueid, + len(messages), + ) + text = "" last_speaker = None - for message in self.complete_call: + for message in messages: if last_speaker != message["speaker_name"]: text += f'\n{message["speaker_name"]}: ' text += f'{message["transcription"]}\n' last_speaker = message["speaker_name"] + if not text.strip(): + logger.warning("No transcript content available to publish for %s", self.uniqueid) + return + # publish the full conversation to mqtt await self.mqtt_client.publish( topic='final', diff --git a/tests/test_deepgram_connector.py b/tests/test_deepgram_connector.py new file mode 100644 index 0000000..63db5a1 --- /dev/null +++ b/tests/test_deepgram_connector.py @@ -0,0 +1,84 @@ +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from deepgram_connector import DeepgramConnector + + +def _make_connector(): + return DeepgramConnector( + deepgram_api_key="test-key", + rtp_stream_in=SimpleNamespace(reader=SimpleNamespace(read=lambda _: b"")), + rtp_stream_out=SimpleNamespace(reader=SimpleNamespace(read=lambda _: b"")), + mqtt_client=SimpleNamespace(publish=AsyncMock()), + uniqueid="1234567890.1", + language="en", + speaker_name_in="Alice", + speaker_number_in="100", + speaker_name_out="Bob", + speaker_number_out="200", + ) + + +@pytest.mark.asyncio +async def test_close_publishes_fallback_interim_transcript(monkeypatch): + connector = _make_connector() + monkeypatch.setenv("DEEPGRAM_FINALIZE_GRACE_SECONDS", "0") + + connector.latest_interim_by_channel = { + 0: { + "uniqueid": connector.uniqueid, + "transcription": "hello world", + "timestamp": 1.5, + "speaker_name": "Alice", + "speaker_number": "100", + "speaker_counterpart_name": "Bob", + "speaker_counterpart_number": "200", + "is_final": False, + } + } + + await connector.close() + + connector.mqtt_client.publish.assert_awaited_once() + _, payload = connector.mqtt_client.publish.await_args.kwargs["topic"], connector.mqtt_client.publish.await_args.kwargs["payload"] + assert connector.mqtt_client.publish.await_args.kwargs["topic"] == "final" + assert "hello world" in payload + + +@pytest.mark.asyncio +async def test_close_prefers_final_segments(monkeypatch): + connector = _make_connector() + monkeypatch.setenv("DEEPGRAM_FINALIZE_GRACE_SECONDS", "0") + + connector.complete_call = [ + { + "uniqueid": connector.uniqueid, + "transcription": "final text", + "timestamp": 2.0, + "speaker_name": "Alice", + "speaker_number": "100", + "speaker_counterpart_name": "Bob", + "speaker_counterpart_number": "200", + "is_final": True, + } + ] + connector.latest_interim_by_channel = { + 0: { + "uniqueid": connector.uniqueid, + "transcription": "interim text", + "timestamp": 1.0, + "speaker_name": "Alice", + "speaker_number": "100", + "speaker_counterpart_name": "Bob", + "speaker_counterpart_number": "200", + "is_final": False, + } + } + + await connector.close() + + payload = connector.mqtt_client.publish.await_args.kwargs["payload"] + assert "final text" in payload + assert "interim text" not in payload From 9aa0128132123f58af9ae10f553002ac16209335 Mon Sep 17 00:00:00 2001 From: Tommaso Ascani Date: Wed, 18 Mar 2026 15:27:27 +0100 Subject: [PATCH 2/3] feat(connector): improve connector state management and add tests for concurrency and error handling --- asterisk_bridge.py | 8 ++++++ deepgram_connector.py | 4 +-- tests/test_asterisk_bridge.py | 42 ++++++++++++++++++++++++++++++++ tests/test_deepgram_connector.py | 11 +++++++++ 4 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 tests/test_asterisk_bridge.py diff --git a/asterisk_bridge.py b/asterisk_bridge.py index 52415e4..c1a91bd 100644 --- a/asterisk_bridge.py +++ b/asterisk_bridge.py @@ -245,6 +245,7 @@ async def _handle_stasis_start(self, event): or self.channels[channel_id]['linkedid'] in self.pending_transcription_requests ) self.channels[channel_id]['connector_started'] = False + self.channels[channel_id]['connector_starting'] = False self.pending_transcription_requests.discard(channel_id) logger.debug(f"Channel {channel_id} entered Satellite. Details: {channel}") # Create a snoop channel for in and one for out @@ -386,11 +387,14 @@ async def _start_connector(self, channel_id): channel = self.channels[channel_id] if channel.get('connector_started'): return + if channel.get('connector_starting'): + return if 'rtp_stream_in' not in channel or 'rtp_stream_out' not in channel: logger.info(f"Transcription requested for {channel_id} but RTP streams are not ready yet") return + channel['connector_starting'] = True if channel.get('call_elapsed_at_start') is None: channel['call_elapsed_at_start'] = await self._get_answered_elapsed_seconds(channel_id) @@ -399,8 +403,11 @@ async def _start_connector(self, channel_id): await channel['connector'].start() channel['connector_started'] = True + channel['connector_starting'] = False logger.info(f"Deepgram connector started for channel {channel_id}") except Exception as e: + if channel_id in self.channels: + self.channels[channel_id]['connector_starting'] = False logger.error(f"Failed to start Deepgram connector for channel {channel_id}: {e}") # Close the channel if connector fails to start if channel_id in self.channels: @@ -444,6 +451,7 @@ async def stop_transcription(self, call_id): except Exception as e: logger.debug(f"Failed to close connector for channel {channel_id}: {e}") channel['connector_started'] = False + channel['connector_starting'] = False async def _handle_stasis_end(self, event): """Handle channel hangup event""" diff --git a/deepgram_connector.py b/deepgram_connector.py index 82a4f52..17d526a 100644 --- a/deepgram_connector.py +++ b/deepgram_connector.py @@ -169,13 +169,13 @@ async def on_message(self, client, result, **kwargs): except Exception as e: logger.error(f"Failed to schedule transcription publishing: {e}") - def on_metadata(self, client, metadata, **kwargs): + async def on_metadata(self, client, metadata, **kwargs): """ Handle metadata events """ return - def on_error(self, client, error, **kwargs): + async def on_error(self, client, error, **kwargs): """ Handle error events """ diff --git a/tests/test_asterisk_bridge.py b/tests/test_asterisk_bridge.py new file mode 100644 index 0000000..65c29d1 --- /dev/null +++ b/tests/test_asterisk_bridge.py @@ -0,0 +1,42 @@ +import asyncio +from unittest.mock import AsyncMock + +import pytest + +from asterisk_bridge import AsteriskBridge + + +@pytest.mark.asyncio +async def test_start_connector_ignores_concurrent_duplicate_start(): + bridge = AsteriskBridge( + url="http://ari.local", + app="satellite", + username="user", + password="pass", + mqtt_client=AsyncMock(), + rtp_server=AsyncMock(), + ) + + start_gate = asyncio.Event() + connector = AsyncMock() + connector.start = AsyncMock(side_effect=lambda: start_gate.wait()) + + bridge.channels["chan-1"] = { + "connector_started": False, + "connector_starting": False, + "rtp_stream_in": object(), + "rtp_stream_out": object(), + "call_elapsed_at_start": 0, + "connector": connector, + } + + first = asyncio.create_task(bridge._start_connector("chan-1")) + await asyncio.sleep(0) + second = asyncio.create_task(bridge._start_connector("chan-1")) + await asyncio.sleep(0) + + start_gate.set() + await asyncio.gather(first, second) + + connector.start.assert_awaited_once() + assert bridge.channels["chan-1"]["connector_started"] is True diff --git a/tests/test_deepgram_connector.py b/tests/test_deepgram_connector.py index 63db5a1..9b93bdf 100644 --- a/tests/test_deepgram_connector.py +++ b/tests/test_deepgram_connector.py @@ -82,3 +82,14 @@ async def test_close_prefers_final_segments(monkeypatch): payload = connector.mqtt_client.publish.await_args.kwargs["payload"] assert "final text" in payload assert "interim text" not in payload + + +@pytest.mark.asyncio +async def test_on_error_is_awaitable_and_schedules_close(): + connector = _make_connector() + connector.loop = AsyncMock() + connector.loop.is_running.return_value = True + + await connector.on_error(None, {"message": "boom"}) + + connector.loop.call_soon_threadsafe.assert_called_once() From 486e4818e9fcc94b39017d4af1bdce8d834f0b86 Mon Sep 17 00:00:00 2001 From: Tommaso Ascani Date: Thu, 19 Mar 2026 13:25:14 +0100 Subject: [PATCH 3/3] feat(ai): add text sanitization and error handling for LLM input --- ai.py | 74 +++++++++++++++++++++++++++++++++++++++++++----- tests/test_ai.py | 40 ++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 7 deletions(-) create mode 100644 tests/test_ai.py diff --git a/ai.py b/ai.py index da83986..7d1d4a6 100644 --- a/ai.py +++ b/ai.py @@ -1,9 +1,9 @@ import logging import time +import unicodedata from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate -from langchain_core.runnables import RunnablePassthrough from langchain_text_splitters import RecursiveCharacterTextSplitter @@ -35,6 +35,57 @@ def _clamp_sentiment(value: int) -> int: return value +def _sanitize_text_for_llm(text: str) -> str: + if not text: + return "" + + sanitized_chars = [] + changed = False + for char in text: + if char == "\x00" or unicodedata.category(char) == "Cs": + changed = True + continue + sanitized_chars.append(char) + + sanitized = "".join(sanitized_chars) + if sanitized != text: + changed = True + + # Ensure the payload can always be encoded as valid UTF-8 JSON. + utf8_sanitized = sanitized.encode("utf-8", errors="ignore").decode("utf-8") + if utf8_sanitized != sanitized: + changed = True + + if changed: + logger.warning( + "AI pipeline: sanitized text for LLM (before_len=%d after_len=%d)", + len(text), + len(utf8_sanitized), + ) + + return utf8_sanitized + + +def _is_invalid_json_body_error(error: Exception) -> bool: + message = str(error or "").lower() + return "parse the json body" in message or "not valid json" in message + + +def _invoke_chain(chain, text: str, stage: str) -> str: + sanitized_text = _sanitize_text_for_llm(text) + try: + response = chain.invoke({"text": sanitized_text}) + return (response.content or "").strip() + except Exception as error: + if not _is_invalid_json_body_error(error): + raise + + logger.warning("AI pipeline: retrying %s after invalid JSON body error", stage) + retry_text = _sanitize_text_for_llm(sanitized_text) + response = chain.invoke({"text": retry_text}) + return (response.content or "").strip() + + def generate_clean_summary_sentiment(text: str): """Generate cleaned transcription, summary, and sentiment (0-10). @@ -147,11 +198,11 @@ def generate_clean_summary_sentiment(text: str): for idx, chunk in enumerate(chunks): try: logger.debug("AI pipeline: cleaning chunk %d/%d (len=%d)", idx + 1, len(chunks), len(chunk)) - cleaned_chunks.append(clean_chain.invoke({"text": chunk}).content) + cleaned_chunks.append(_invoke_chain(clean_chain, chunk, f"clean chunk {idx + 1}/{len(chunks)}")) except Exception: logger.exception("AI pipeline: failed cleaning chunk %d/%d", idx + 1, len(chunks)) raise - cleaned = "\n\n".join([c.strip() for c in cleaned_chunks if c and c.strip()]).strip() + cleaned = _sanitize_text_for_llm("\n\n".join([c.strip() for c in cleaned_chunks if c and c.strip()]).strip()) logger.debug("AI pipeline: cleaned_len=%d", len(cleaned)) summarize_chunk_prompt = ChatPromptTemplate.from_messages( @@ -189,7 +240,13 @@ def generate_clean_summary_sentiment(text: str): for idx, chunk in enumerate(summarize_chunks): try: logger.debug("AI pipeline: summarizing chunk %d/%d (len=%d)", idx + 1, len(summarize_chunks), len(chunk)) - chunk_summaries.append(summarize_chunk_chain.invoke({"text": chunk}).content) + chunk_summaries.append( + _invoke_chain( + summarize_chunk_chain, + chunk, + f"summarize chunk {idx + 1}/{len(summarize_chunks)}", + ) + ) except Exception: logger.exception("AI pipeline: failed summarizing chunk %d/%d", idx + 1, len(summarize_chunks)) raise @@ -219,11 +276,14 @@ def generate_clean_summary_sentiment(text: str): ) reduce_chain = reduce_prompt | llm try: - summary = reduce_chain.invoke({"text": "\n\n".join([s.strip() for s in chunk_summaries if s and s.strip()])}).content + summary = _invoke_chain( + reduce_chain, + "\n\n".join([s.strip() for s in chunk_summaries if s and s.strip()]), + "reduce summaries", + ) except Exception: logger.exception("AI pipeline: failed reducing chunk summaries") raise - summary = (summary or "").strip() logger.debug("AI pipeline: summary_len=%d", len(summary)) sentiment_prompt = ChatPromptTemplate.from_messages( @@ -255,7 +315,7 @@ def generate_clean_summary_sentiment(text: str): sentiment_chain = sentiment_prompt | llm try: - sentiment_text = sentiment_chain.invoke({"text": cleaned[:20000]}).content + sentiment_text = _invoke_chain(sentiment_chain, cleaned[:20000], "sentiment scoring") except Exception: logger.exception("AI pipeline: failed sentiment scoring") raise diff --git a/tests/test_ai.py b/tests/test_ai.py new file mode 100644 index 0000000..c0da7d5 --- /dev/null +++ b/tests/test_ai.py @@ -0,0 +1,40 @@ +from types import SimpleNamespace + +import ai + + +class _FakeChain: + def __init__(self, responses): + self._responses = list(responses) + self.calls = [] + + def invoke(self, payload): + self.calls.append(payload) + response = self._responses.pop(0) + if isinstance(response, Exception): + raise response + return SimpleNamespace(content=response) + + +def test_sanitize_text_for_llm_removes_nulls_and_surrogates(): + raw = "hello\x00world\ud800!" + + sanitized = ai._sanitize_text_for_llm(raw) + + assert sanitized == "helloworld!" + + +def test_invoke_chain_retries_after_invalid_json_body_error(): + chain = _FakeChain( + [ + Exception("Error code: 400 - We could not parse the JSON body of your request."), + "final summary", + ] + ) + + content = ai._invoke_chain(chain, "hello\x00world", "reduce summaries") + + assert content == "final summary" + assert len(chain.calls) == 2 + assert chain.calls[0]["text"] == "helloworld" + assert chain.calls[1]["text"] == "helloworld"