Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/test_realtime_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def entrypoint(ctx: JobContext):
model="gpt-realtime-2025-08-28",
config=OpenAIRealtimeConfig(
voice="alloy", # alloy, ash, ballad, coral, echo, fable, onyx, nova, sage, shimmer, and verse
modalities=["audio", "text"],
modalities=["audio"],
turn_detection=TurnDetection(
type="server_vad",
threshold=0.5,
Expand Down
6 changes: 4 additions & 2 deletions videosdk-agents/videosdk/agents/metrics/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ class RealtimeTurnData:
def compute_latencies(self):
if self.user_speech_end_time and self.agent_speech_start_time:
self.ttfb = max(0, (self.agent_speech_start_time - self.user_speech_end_time) * 1000)
self.e2e_latency = self.ttfb
self.thinking_delay = self.ttfb
if self.user_speech_end_time and self.agent_speech_start_time:
self.thinking_delay = max(0, (self.agent_speech_start_time - self.user_speech_end_time) * 1000)
if self.user_speech_start_time and self.agent_speech_end_time:
self.e2e_latency = (self.agent_speech_end_time - self.user_speech_start_time) * 1000
if self.agent_speech_start_time and self.agent_speech_end_time:
self.agent_speech_duration = (self.agent_speech_end_time - self.agent_speech_start_time) * 1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def __init__(self) -> None:
self.analytics_client = AnalyticsClient()
self.traces_flow_manager: Optional[TracesFlowManager] = None
self.playground: bool = False

def set_session_id(self, session_id: str):
"""Set the session ID for metrics tracking"""
self.analytics_client.set_session_id(session_id)
Expand Down Expand Up @@ -99,56 +98,44 @@ async def _start_new_interaction(self) -> None:
**RealtimeMetricsCollector._agent_info
)
self.turns.append(self.current_turn)
self.last_user_activity_time = None

def mark_user_activity(self, timestamp: Optional[float] = None) -> None:
"""Mark the time of the last user activity (e.g. transcription received)"""
self.last_user_activity_time = timestamp if timestamp is not None else time.perf_counter()

async def set_user_speech_start(self) -> None:
if self.current_turn and self.current_turn.agent_speech_start_time is not None and self.current_turn.agent_speech_end_time is None:
await self.set_interrupted()
if self.current_turn and (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is not None):
if self.current_turn:
self._finalize_interaction_and_send()

await self._start_new_interaction()
if self.current_turn and self.current_turn.user_speech_start_time is None:
self.current_turn.user_speech_start_time = time.perf_counter()
await self.start_timeline_event("user_speech", self.current_turn.user_speech_start_time)

async def set_user_speech_end(self, timestamp: Optional[float] = None) -> None:
if self.current_turn and (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
if timestamp is not None:
self.current_turn.user_speech_end_time = timestamp
elif self.last_user_activity_time is not None:
self.current_turn.user_speech_end_time = self.last_user_activity_time
else:
self.current_turn.user_speech_end_time = time.perf_counter()
await self.end_timeline_event("user_speech", self.current_turn.user_speech_end_time)
await self.start_timeline_event("user_speech")

async def set_user_speech_end(self) -> None:
if self.current_turn and self.current_turn.user_speech_end_time is None:
self.current_turn.user_speech_end_time = time.perf_counter()
await self.end_timeline_event("user_speech")

async def set_agent_speech_start(self) -> None:
if not self.current_turn:
await self._start_new_interaction()
elif (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
await self.set_user_speech_end()
elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None:
self.current_turn.user_speech_end_time = time.perf_counter()

await self.end_timeline_event("user_speech")

if self.current_turn and self.current_turn.agent_speech_start_time is None:
self.current_turn.agent_speech_start_time = time.perf_counter()
await self.start_timeline_event("agent_speech", self.current_turn.agent_speech_start_time)
await self.start_timeline_event("agent_speech")
if self.agent_speech_end_timer:
self.agent_speech_end_timer.cancel()

async def set_agent_speech_end(self, timeout: float = 1.0) -> None:
if self.current_turn:
if self.current_turn.agent_speech_start_time is None:
return
self.current_turn.agent_speech_end_time = time.perf_counter()
if self.agent_speech_end_timer:
self.agent_speech_end_timer.cancel()

loop = asyncio.get_event_loop()
self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send)
await self.end_timeline_event("agent_speech", self.current_turn.agent_speech_end_time)
await self.end_timeline_event("agent_speech")

async def set_a2a_handoff(self) -> None:
"""Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
Expand All @@ -170,15 +157,12 @@ def _finalize_agent_speech(self) -> None:
self.agent_speech_end_timer = None

def _finalize_interaction_and_send(self) -> None:
if self.agent_speech_end_timer:
self.agent_speech_end_timer.cancel()
self.agent_speech_end_timer = None
if not self.current_turn:
return

self._finalize_agent_speech()

if (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time:
self.current_turn.user_speech_end_time = time.perf_counter()

current_time = time.perf_counter()
Expand Down Expand Up @@ -232,18 +216,19 @@ async def add_timeline_event(self, event: TimelineEvent) -> None:
if self.current_turn:
self.current_turn.timeline.append(event)

async def start_timeline_event(self, event_type: str, start_time: float) -> None:
async def start_timeline_event(self, event_type: str) -> None:
"""Start a timeline event with a precise start time"""
if self.current_turn:
event = TimelineEvent(
event_type=event_type,
start_time=start_time
start_time=time.perf_counter()
)
self.current_turn.timeline.append(event)

async def end_timeline_event(self, event_type: str, end_time: float) -> None:
async def end_timeline_event(self, event_type: str) -> None:
"""End a timeline event and calculate duration"""
if self.current_turn:
end_time = time.perf_counter()
for event in reversed(self.current_turn.timeline):
if event.event_type == event_type and event.end_time is None:
event.end_time = end_time
Expand All @@ -268,7 +253,10 @@ async def set_user_transcript(self, text: str) -> None:
if self.current_turn:
if self.current_turn.user_speech_start_time is None:
self.current_turn.user_speech_start_time = time.perf_counter()
await self.start_timeline_event("user_speech", self.current_turn.user_speech_start_time)
await self.start_timeline_event("user_speech")
if self.current_turn.user_speech_end_time is None:
self.current_turn.user_speech_end_time = time.perf_counter()
await self.end_timeline_event("user_speech")
logger.info(f"user input speech: {text}")
await self.update_timeline_event_text("user_speech", text)

Expand All @@ -277,7 +265,7 @@ async def set_agent_response(self, text: str) -> None:
if self.current_turn:
if self.current_turn.agent_speech_start_time is None:
self.current_turn.agent_speech_start_time = time.perf_counter()
await self.start_timeline_event("agent_speech", self.current_turn.agent_speech_start_time)
await self.start_timeline_event("agent_speech")
logger.info(f"agent output speech: {text}")
await self.update_timeline_event_text("agent_speech", text)

Expand All @@ -288,19 +276,8 @@ def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
self.current_turn.realtime_model_errors.append(error)

async def set_interrupted(self) -> None:
"""
Handle interruption by finalizing the current turn immediately.
Only marks as interrupted if the agent was actually speaking.
"""
if self.current_turn:
if self.current_turn.agent_speech_start_time is not None:
self.current_turn.interrupted = True
if self.current_turn.agent_speech_end_time is None:
self.current_turn.agent_speech_end_time = time.perf_counter()
await self.end_timeline_event("agent_speech", self.current_turn.agent_speech_end_time)
self._finalize_interaction_and_send()
else:
logger.debug("Interrupt signal received but agent hadn't started speaking - ignoring to preserve turn")
self.current_turn.interrupted = True

def finalize_session(self) -> None:
asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop())
Expand Down
10 changes: 3 additions & 7 deletions videosdk-agents/videosdk/agents/realtime_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,7 @@ def _configure_components(self) -> None:
if self.avatar:
self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track
elif self.audio_track:
self.model.audio_track = self.audio_track

if self.model.audio_track and hasattr(self.model.audio_track, "on_last_audio_byte"):
async def on_last_audio_byte() -> None:
logger.info("[RealTimePipeline] Audio playback finished — setting agent_speech_end_time")
await realtime_metrics_collector.set_agent_speech_end()
self.model.audio_track.on_last_audio_byte(on_last_audio_byte)
self.model.audio_track = self.audio_track

async def start(self, **kwargs: Any) -> None:
"""
Expand Down Expand Up @@ -135,6 +129,7 @@ def _on_agent_speech_ended(self, data: dict) -> None:
"""
Handle agent speech ended event and mark utterance as done, forwarding to agent if handler exists.
"""
asyncio.create_task(realtime_metrics_collector.set_agent_speech_end())
if self._current_utterance_handle and not self._current_utterance_handle.done():
self._current_utterance_handle._mark_done()
self.model.current_utterance = None
Expand Down Expand Up @@ -165,6 +160,7 @@ def on_user_speech_started(self, data: dict) -> None:
"""
Handle user speech started event
"""
asyncio.create_task(realtime_metrics_collector.set_user_speech_start())
self._notify_speech_started()
# self.interrupt() # Not sure yet whether this affects utterance handling.
if self.agent.session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ def __init__(
self.is_active = False
self.response_task = None
self._agent_speaking = False
self._user_speaking = False
self._user_transcript_received = False
self._initialize_bedrock_client()
self.input_sample_rate = 48000
self.target_sample_rate = 16000
Expand Down Expand Up @@ -321,8 +319,6 @@ async def handle_audio_input(self, audio_data: bytes) -> None:
try:
audio_array = np.frombuffer(audio_data, dtype=np.int16)

if audio_array.size == 0:
return
if len(audio_array) % 2 == 0:
audio_array = audio_array.reshape(-1, 2)
audio_array = np.mean(audio_array, axis=1).astype(np.int16)
Expand Down Expand Up @@ -398,19 +394,12 @@ async def _process_responses(self):
role = text_output.get(
"role", "UNKNOWN")
if role == "USER":
if transcript and isinstance(transcript, str) and transcript.strip():
realtime_metrics_collector.mark_user_activity()
if not self._user_speaking:
await realtime_metrics_collector.set_user_speech_start()
self._user_speaking = True
self._user_transcript_received = False
if not self._user_transcript_received:
await realtime_metrics_collector.set_user_speech_end()
self._user_speaking = False
self._user_transcript_received = True
await realtime_metrics_collector.set_user_transcript(
transcript
)
await realtime_metrics_collector.set_user_speech_start()
await realtime_metrics_collector.set_user_transcript(
transcript
)
await realtime_metrics_collector.set_user_speech_end()
await self.emit("user_speech_ended", {})
try:
await self.emit(
"realtime_model_transcription",
Expand Down Expand Up @@ -463,8 +452,8 @@ async def _process_responses(self):
audio_bytes = base64.b64decode(
audio_content)
if not self._agent_speaking:
await realtime_metrics_collector.set_agent_speech_start()
await self.emit("agent_speech_started", {})
await realtime_metrics_collector.set_agent_speech_start()
self._agent_speaking = True

if (
Expand All @@ -490,8 +479,11 @@ async def _process_responses(self):
"stopReason", "") == "END_TURN"
and self._agent_speaking
):
await self.emit("agent_speech_ended", {})
await realtime_metrics_collector.set_agent_speech_end(
timeout=1.0
)
self._agent_speaking = False
await self.emit("agent_speech_ended", {})

elif "usageEvent" in json_data["event"]:
pass
Expand All @@ -511,7 +503,9 @@ async def _process_responses(self):
print(
f"Nova completionEnd received: {json.dumps(completion_end, indent=2)}"
)
await self.emit("agent_speech_ended", {})
await realtime_metrics_collector.set_agent_speech_end(
timeout=1.0
)
self._agent_speaking = False

else:
Expand Down Expand Up @@ -598,12 +592,13 @@ async def interrupt(self) -> None:
if self.audio_track:
self.audio_track.interrupt()
print("Interrupting user speech, calling set_agent_speech_end")
await self.emit("user_speech_ended", {})
await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
await realtime_metrics_collector.set_interrupted()
if self._agent_speaking:
print("Interrupting agent speech, calling set_agent_speech_end")
await self.emit("agent_speech_ended", {})
await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
self._agent_speaking = False
self._user_transcript_received = False

content_end_payload = {
"event": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ async def _receive_loop(self, session: GeminiSession) -> None:
if self.current_utterance and not self.current_utterance.is_interruptible:
logger.info("Interruption is disabled for the current utterance. Ignoring server interrupt signal.")
continue
await realtime_metrics_collector.set_interrupted()

if active_response_id:
active_response_id = None
accumulated_text = ""
Expand Down Expand Up @@ -584,6 +584,9 @@ async def _receive_loop(self, session: GeminiSession) -> None:
accumulated_text = ""
final_transcription = ""
self.emit("agent_speech_ended", {})
await realtime_metrics_collector.set_agent_speech_end(
timeout=1.0
)
self._agent_speaking = False

except Exception as e:
Expand Down Expand Up @@ -677,6 +680,7 @@ async def handle_audio_input(self, audio_data: bytes) -> None:
"""Handle incoming audio data from the user"""
if not self._session or self._closing:
return

if self.current_utterance and not self.current_utterance.is_interruptible:
logger.info("Interruption is disabled for the current utterance. Not processing audio input.")
return
Expand Down
Loading