diff --git a/studio/api/app/api/agents.py b/studio/api/app/api/agents.py index 39df31c..0648ae8 100644 --- a/studio/api/app/api/agents.py +++ b/studio/api/app/api/agents.py @@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException from loguru import logger -from pydantic import BaseModel +from pydantic import BaseModel, Field from pydantic_ai import Agent as PydanticAiAgent from pydantic_ai.messages import ModelRequest, ModelResponse, TextPart, UserPromptPart from pydantic_core import to_jsonable_python @@ -304,6 +304,11 @@ async def delete_agent( await db.delete(agent) +class EscalationDestination(BaseModel): + name: str = Field(..., min_length=1, max_length=50) + phone_number: str = Field(..., min_length=4, max_length=100, pattern=r'^(?:\+?[0-9\s\-\(\)]+|sips?:.+)$') + + class AgentConfigPatch(BaseModel): """Fields that can be patched directly on the agent's active config.""" @@ -313,6 +318,7 @@ class AgentConfigPatch(BaseModel): tts_provider: str | None = None tts_model: str | None = None gemini_live_model: str | None = None + escalation_destinations: list[EscalationDestination] | None = None regenerate_greeting: bool = False diff --git a/studio/api/app/api/voice_session.py b/studio/api/app/api/voice_session.py index 5432389..9807851 100644 --- a/studio/api/app/api/voice_session.py +++ b/studio/api/app/api/voice_session.py @@ -158,6 +158,7 @@ async def text_test_socket( ) greeting = "Hello! How can I help you today?" graph_json: str | None = None + escalation_destinations_json: str | None = None try: agent_uuid = uuid.UUID(agent_id) @@ -183,6 +184,8 @@ async def text_test_socket( else: system_prompt = cfg.get("system_prompt", system_prompt) greeting = cfg.get("greeting", greeting) + if destinations := cfg.get("escalation_destinations"): + escalation_destinations_json = json.dumps(destinations) except Exception: logger.warning("Text test: failed to resolve agent config for {}", agent_id) @@ -214,6 +217,7 @@ def _build_runner() -> AgentRunner: temperature=float(getattr(llm_cfg, "temperature", 0.7)), max_tokens=int(getattr(llm_cfg, "max_tokens", 512)), secrets=secrets or None, + escalation_destinations_json=escalation_destinations_json, ) runner = _build_runner() diff --git a/studio/api/voice_engine.pyi b/studio/api/voice_engine.pyi index a0de170..2476028 100644 --- a/studio/api/voice_engine.pyi +++ b/studio/api/voice_engine.pyi @@ -107,6 +107,7 @@ class AgentRunner: max_tokens: int = 32768, greeting: str | None = None, secrets: dict[str, str] | None = None, + escalation_destinations_json: str | None = None, ) -> None: ... def send(self, text: str) -> list[Any]: ... def start_turn(self, text: str) -> None: ... diff --git a/studio/web/src/components/agent/agent-config-editor.tsx b/studio/web/src/components/agent/agent-config-editor.tsx index 6793c95..50fee4d 100644 --- a/studio/web/src/components/agent/agent-config-editor.tsx +++ b/studio/web/src/components/agent/agent-config-editor.tsx @@ -135,6 +135,7 @@ function getConfigFields(config: Record): { tts_provider: string; tts_model: string; gemini_live_model: string; + escalation_destinations: { name: string; phone_number: string }[]; } { return { language: (config?.language as string) || "en", @@ -144,6 +145,7 @@ function getConfigFields(config: Record): { tts_model: (config?.tts_model as string) || "", gemini_live_model: (config?.geminiLiveModel as string) || (config?.gemini_live_model as string) || "", + escalation_destinations: (config?.escalation_destinations as { name: string; phone_number: string }[]) || [], }; } @@ -209,6 +211,9 @@ export default function AgentConfigEditor({ const [language, setLanguage] = useState(fields?.language ?? "en"); const [timezone, setTimezone] = useState(fields?.timezone ?? ""); const [voiceIdDraft, setVoiceIdDraft] = useState(fields?.voice_id ?? ""); + const [escalationDestinations, setEscalationDestinations] = useState<{id: string, name: string, phone_number: string}[]>( + (fields?.escalation_destinations ?? []).map((d: any) => ({ ...d, id: Math.random().toString(36).substring(7) })) + ); const [saving, setSaving] = useState(null); const [saved, setSaved] = useState(null); const [tts, setTts] = useState(null); @@ -284,11 +289,12 @@ export default function AgentConfigEditor({ setLanguage(fields.language); setTimezone(fields.timezone); setVoiceIdDraft(fields.voice_id); + setEscalationDestinations((fields.escalation_destinations ?? []).map((d: any) => ({ ...d, id: Math.random().toString(36).substring(7) }))); // eslint-disable-next-line react-hooks/exhaustive-deps - }, [fields?.language, fields?.timezone, fields?.voice_id, fields?.gemini_live_model]); + }, [fields?.language, fields?.timezone, fields?.voice_id, fields?.gemini_live_model, fields?.escalation_destinations]); const patchField = useCallback( - async (payload: Record) => { + async (payload: Record) => { const primaryField = Object.keys(payload).find((k) => k !== "regenerate_greeting"); if (!primaryField && !payload.regenerate_greeting) return; const trackField = primaryField ?? "language"; @@ -783,6 +789,67 @@ export default function AgentConfigEditor({ + + } + label="Human Handoff / Escalation" + description="Configure escalation destinations for telephony agents" + className="items-start py-6 flex-col gap-4" + childrenClassName="w-full ml-0 pl-12" + > +
+ {escalationDestinations.map((dest, idx) => ( +
+ { + const newDests = [...escalationDestinations]; + newDests[idx].name = e.target.value; + setEscalationDestinations(newDests); + }} + onBlur={() => patchField({ escalation_destinations: escalationDestinations.map(({ id, ...rest }) => rest) })} + placeholder="Name (e.g. Sales)" + disabled={saving === "escalation_destinations"} + className="h-9 w-40 rounded-lg border border-border/60 bg-accent/30 px-3 text-xs font-bold text-foreground placeholder:text-muted-foreground/40 focus:outline-none focus:ring-1 focus:ring-primary/40 disabled:opacity-50 transition-all" + /> + { + const newDests = [...escalationDestinations]; + newDests[idx].phone_number = e.target.value; + setEscalationDestinations(newDests); + }} + onBlur={() => patchField({ escalation_destinations: escalationDestinations.map(({ id, ...rest }) => rest) })} + placeholder="Phone (e.g. +1234567890)" + disabled={saving === "escalation_destinations"} + className="h-9 w-48 rounded-lg border border-border/60 bg-accent/30 px-3 text-xs font-bold text-foreground placeholder:text-muted-foreground/40 focus:outline-none focus:ring-1 focus:ring-primary/40 disabled:opacity-50 transition-all font-mono" + /> + +
+ ))} + +
+
diff --git a/voice/engine/Cargo.toml b/voice/engine/Cargo.toml index 1c9b5f3..4286005 100644 --- a/voice/engine/Cargo.toml +++ b/voice/engine/Cargo.toml @@ -93,6 +93,7 @@ hex = "0.4.3" secrecy = "0.10.3" async-trait = "0.1.89" base64 = "0.22.1" +sha1 = "0.11.0" [features] default = [] diff --git a/voice/engine/crates/agent-kit/src/agent_backends/default.rs b/voice/engine/crates/agent-kit/src/agent_backends/default.rs index 7316cab..f29e7d8 100644 --- a/voice/engine/crates/agent-kit/src/agent_backends/default.rs +++ b/voice/engine/crates/agent-kit/src/agent_backends/default.rs @@ -19,7 +19,8 @@ use crate::micro_tasks; use crate::providers::{LlmCallConfig, LlmProvider, LlmProviderError}; use crate::swarm::{ build_node_tool_schemas, make_artifact_tool_schemas, make_hang_up_tool_schema, - make_on_hold_tool_schema, AgentGraphDef, SwarmState, HANG_UP_TOOL_NAME, ON_HOLD_TOOL_NAME, + make_on_hold_tool_schema, AgentGraphDef, EscalationDestination, SwarmState, + ESCALATE_CALL_TOOL_NAME, HANG_UP_TOOL_NAME, ON_HOLD_TOOL_NAME, }; use crate::tool_executor::{spawn_tool_task, ToolTaskResult}; use crate::agent_backends::ChatMessage; @@ -182,6 +183,14 @@ struct PendingHangUp { content: Option, } +/// A `escalate_call` tool call that was deferred during LLM streaming. +/// Emitted as `AgentEvent::EscalateCall` once all sibling tools complete. +struct PendingEscalateCall { + /// Resolved phone number (looked up from destination_name). + destination: String, + reason: String, +} + // ── Observability ──────────────────────────────────────────────── /// Per-LLM-call observability context. @@ -265,6 +274,18 @@ pub struct DefaultAgentBackend { /// other tools are still in-flight. Cleared at turn start / cancel. pending_hang_up: Option, + // ── Deferred escalate_call ── + /// Set when an `escalate_call` tool call is seen during LLM streaming. + /// Emitted as `AgentEvent::EscalateCall` after TTS drains. + pending_escalate_call: Option, + + // ── Telephony/Escalation context ── + /// True when this session is over a telephony transport (Twilio/Telnyx). + /// Controls whether `escalate_call` tool is injected. + is_telephony: bool, + /// Pre-configured escalation targets for this agent. + escalation_destinations: Vec, + // ── Observability (per LLM call) ── /// Present while an LLM stream is active; consumed by `emit_llm_complete`. obs: Option, @@ -343,6 +364,9 @@ impl DefaultAgentBackend { pending_tokens: Vec::new(), pending_tool_calls: Vec::new(), pending_hang_up: None, + pending_escalate_call: None, + is_telephony: false, + escalation_destinations: Vec::new(), obs: None, } } @@ -352,6 +376,20 @@ impl DefaultAgentBackend { self.swarm.as_ref().and_then(|s| s.graph.timezone.clone()) } + /// Configure telephony context for this session. + /// + /// When `is_telephony` is `true` and `destinations` is non-empty, + /// the `escalate_call` tool is injected into the LLM tool schemas. + pub fn with_telephony_escalation( + mut self, + is_telephony: bool, + destinations: Vec, + ) -> Self { + self.is_telephony = is_telephony; + self.escalation_destinations = destinations; + self + } + /// Set a tool interceptor for intercepting tool execution. /// /// Used by [`ArtifactInterceptor`] in production and optionally by callers for @@ -371,7 +409,12 @@ impl DefaultAgentBackend { ) -> Result<(Vec, f64, u32, Option), String> { if let Some(swarm) = &self.swarm { if let Some(node) = swarm.active_def() { - let schemas = build_node_tool_schemas(node, &swarm.graph.tools); + let schemas = build_node_tool_schemas( + node, + &swarm.graph.tools, + self.is_telephony, + &self.escalation_destinations, + ); let temp = node.temperature.unwrap_or(self.config.temperature); let mt = node.max_tokens.unwrap_or(self.config.max_tokens); let model = node.model.clone(); @@ -627,6 +670,49 @@ impl DefaultAgentBackend { // The stream continues; hang_up is resolved at stream-end. } + /// Record an `escalate_call` tool call as a deferred marker. + /// + /// Resolves the `destination_name` argument to its actual phone number + /// using the pre-configured `escalation_destinations` list. + /// Emitted as `AgentEvent::EscalateCall` once the stream ends and all + /// sibling tool tasks complete. + fn handle_escalate_call(&mut self, tc: &ToolCallEvent) { + if self.pending_escalate_call.is_some() { + warn!("[agent_backend] duplicate escalate_call in same response — ignoring"); + return; + } + let args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_default(); + let destination_name = args + .get("destination_name") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let reason = args + .get("reason") + .and_then(|v| v.as_str()) + .unwrap_or("user_request") + .to_string(); + + // Resolve destination name → phone number + let destination = self + .escalation_destinations + .iter() + .find(|d| d.name == destination_name) + .map(|d| d.phone_number.clone()) + .unwrap_or_else(|| { + warn!( + "[agent_backend] escalate_call: unknown destination '{}' — using name as raw number", + destination_name + ); + destination_name.to_string() + }); + + info!( + "[agent_backend] escalate_call deferred: {} → {} (reason={})", + destination_name, destination, reason + ); + self.pending_escalate_call = Some(PendingEscalateCall { destination, reason }); + } + fn handle_on_hold(&mut self, tc: &ToolCallEvent) { let duration_mins = serde_json::from_str::(&tc.arguments) .ok() @@ -689,10 +775,6 @@ impl DefaultAgentBackend { } Some(InnerLlmEvent::ToolCall(tc)) => { // ── Intercept hang_up (synthetic runtime tool) ── - // Defer: record the intent and continue reading the stream - // so that any sibling tool calls / tokens in the same - // response are not lost. The HangUp event is emitted - // after the stream closes and all tool tasks complete. if tc.name == HANG_UP_TOOL_NAME { self.handle_hang_up(&tc); continue; @@ -701,8 +783,13 @@ impl DefaultAgentBackend { // ── Intercept on_hold (synthetic runtime tool) ── if tc.name == ON_HOLD_TOOL_NAME { self.handle_on_hold(&tc); - // Don't interrupt the stream — let the LLM continue - // generating its acknowledgement text. + continue; + } + + // ── Intercept escalate_call (synthetic telephony tool) ── + // Defer: let the LLM finish its farewell text, then emit EscalateCall. + if tc.name == ESCALATE_CALL_TOOL_NAME { + self.handle_escalate_call(&tc); continue; } @@ -761,6 +848,16 @@ impl DefaultAgentBackend { if self.tools_remaining == 0 { self.phase = Phase::Idle; + // EscalateCall takes highest priority (overrides HangUp) + if let Some(pe) = self.pending_escalate_call.take() { + info!("[agent_backend] escalate_call resolved at stream-end"); + // Clear any concurrent hang_up so the reactor only gets one shutdown signal. + self.pending_hang_up = None; + return Some(AgentEvent::EscalateCall { + destination: pe.destination, + reason: pe.reason, + }); + } // Pending hang_up takes priority over Finished. if let Some(ph) = self.pending_hang_up.take() { info!("[agent_backend] hang_up resolved at stream-end (no pending tools)"); @@ -836,10 +933,19 @@ impl DefaultAgentBackend { }; if self.tools_remaining == 0 { - // All tools done — check for deferred hang_up first. + // All tools done — check for deferred escalate/hang-up first. self.tool_rounds += 1; - if let Some(ph) = self.pending_hang_up.take() { + if let Some(pe) = self.pending_escalate_call.take() { + // escalate_call takes priority over hang_up. + info!("[agent_backend] escalate_call resolved after all tools completed"); + self.pending_hang_up = None; + self.phase = Phase::Idle; + self.event_buffer.push_back(AgentEvent::EscalateCall { + destination: pe.destination, + reason: pe.reason, + }); + } else if let Some(ph) = self.pending_hang_up.take() { // hang_up was deferred while tools were in-flight. // Now that every tool has completed we can end the session. // Buffer HangUp so it is emitted *after* this ToolCallCompleted. diff --git a/voice/engine/crates/agent-kit/src/agent_backends/mod.rs b/voice/engine/crates/agent-kit/src/agent_backends/mod.rs index bf2855a..1380502 100644 --- a/voice/engine/crates/agent-kit/src/agent_backends/mod.rs +++ b/voice/engine/crates/agent-kit/src/agent_backends/mod.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use crate::providers::LlmProviderError; use serde::{Deserialize, Serialize}; -use crate::swarm::{HANG_UP_TOOL_NAME, ON_HOLD_TOOL_NAME, TRANSFER_TOOL_NAME}; +use crate::swarm::{ESCALATE_CALL_TOOL_NAME, HANG_UP_TOOL_NAME, ON_HOLD_TOOL_NAME, TRANSFER_TOOL_NAME}; // ── Public Types ──────────────────────────────────────────────── @@ -105,6 +105,17 @@ pub enum AgentEvent { reason: String, content: Option, }, + /// The agent has requested call escalation / human handoff. + /// + /// Only meaningful for telephony sessions (Twilio/Telnyx). The voice engine + /// should drain TTS first, then issue a `TransportCommand::Transfer` before + /// closing the session. + EscalateCall { + /// E.164 phone number or SIP URI to forward the call to. + destination: String, + /// Human-readable reason for the escalation (for telemetry/logs). + reason: String, + }, /// The user asked to hold / pause — suppress idle shutdown until they return. OnHold { duration_secs: u32 }, /// An LLM generation has completed (for observability). @@ -322,3 +333,8 @@ pub fn is_hang_up_tool(name: &str) -> bool { pub fn is_on_hold_tool(name: &str) -> bool { name == ON_HOLD_TOOL_NAME } + +/// Check if a tool call is the synthetic `escalate_call` tool. +pub fn is_escalate_call_tool(name: &str) -> bool { + name == ESCALATE_CALL_TOOL_NAME +} diff --git a/voice/engine/crates/agent-kit/src/agent_backends/native.rs b/voice/engine/crates/agent-kit/src/agent_backends/native.rs index 0d6845f..43e112e 100644 --- a/voice/engine/crates/agent-kit/src/agent_backends/native.rs +++ b/voice/engine/crates/agent-kit/src/agent_backends/native.rs @@ -81,6 +81,9 @@ pub enum NativeAgentEvent { /// The agent called hang_up — session should end gracefully. HangUp { reason: String }, + /// The agent escalated the call to a human destination. + EscalateCall { destination: String, reason: String }, + /// The model finished speaking (turn boundary). TurnComplete { /// Prompt token count reported by the provider (0 = not available). @@ -164,7 +167,7 @@ impl NativeMultimodalBackend { let tool_schemas: Vec = agent_graph .and_then(|g| { g.nodes.get(&g.entry) - .map(|node| build_node_tool_schemas(node, &g.tools)) + .map(|node| build_node_tool_schemas(node, &g.tools, false, &[])) }) .unwrap_or_default(); @@ -184,6 +187,19 @@ impl NativeMultimodalBackend { } } + /// Appends telephony-specific tools (like `escalate_call`) to the backend if configured. + pub fn with_telephony_escalation( + mut self, + is_telephony: bool, + destinations: Vec, + ) -> Self { + if is_telephony && !destinations.is_empty() { + let schema = crate::swarm::make_escalate_call_tool_schema(&destinations); + self.tool_schemas.push(schema); + } + self + } + /// Open (or re-open) the WebSocket connection to the realtime backend. /// /// Passes the last saved `session_resumption_handle` so Gemini can restore @@ -232,6 +248,24 @@ impl NativeMultimodalBackend { self.provider.interrupt().await } + /// Push a transfer failure to the model. + pub async fn push_transfer_failure_result(&mut self, destination: String, reason: String) -> Result<(), LlmProviderError> { + let msg = if destination.is_empty() { + format!( + "System notification: Call transfer failed. The telephony provider returned: {}. \ + Please inform the user and ask how they would like to proceed.", + reason + ) + } else { + format!( + "System notification: Call transfer to {} failed. The telephony provider returned: {}. \ + Please inform the user and ask how they would like to proceed.", + destination, reason + ) + }; + self.provider.push_client_content(msg).await + } + /// Poll for the next event from the model. /// /// Returns `None` immediately if neither the model nor any in-flight tool @@ -332,6 +366,22 @@ impl NativeMultimodalBackend { return Some(NativeAgentEvent::HangUp { reason }); } + if name == crate::swarm::ESCALATE_CALL_TOOL_NAME { + // Extract payload correctly + let args_json = serde_json::from_str::(&arguments).unwrap_or_default(); + let destination = args_json.get("destination_name").and_then(|v| v.as_str()).unwrap_or_default().to_string(); + let reason = args_json.get("reason").and_then(|v| v.as_str()).unwrap_or("agent_initiated").to_string(); + + info!("[native-backend] escalate_call intercepted (dest={}, reason={})", destination, reason); + // Send a synthetic success result back to the model so it doesn't retry. + let _ = self.provider.send_tool_result( + &call_id, + &name, + serde_json::json!({"result": format!("Forwarding call to {}...", destination)}), + ).await; + return Some(NativeAgentEvent::EscalateCall { destination, reason }); + } + self.dispatch_tool(call_id, name, arguments).await } } diff --git a/voice/engine/crates/agent-kit/src/lib.rs b/voice/engine/crates/agent-kit/src/lib.rs index 67ce27b..f54d9bc 100644 --- a/voice/engine/crates/agent-kit/src/lib.rs +++ b/voice/engine/crates/agent-kit/src/lib.rs @@ -18,9 +18,10 @@ pub mod swarm; // Re-exports for convenience pub use agent_backends::{ - default::DefaultAgentBackend, is_hang_up_tool, is_transfer_tool, AfterToolCallAction, - AgentBackend, AgentBackendConfig, AgentCommand, AgentEvent, AgentOutput, BeforeToolCallAction, - ChatMessage, LlmCommand, LlmOutput, SecretMap, SharedSecretMap, ToolInterceptor, + default::DefaultAgentBackend, is_escalate_call_tool, is_hang_up_tool, is_transfer_tool, + AfterToolCallAction, AgentBackend, AgentBackendConfig, AgentCommand, AgentEvent, AgentOutput, + BeforeToolCallAction, ChatMessage, LlmCommand, LlmOutput, SecretMap, SharedSecretMap, + ToolInterceptor, }; pub use artifact_store::{ArtifactInterceptor, ArtifactStore}; diff --git a/voice/engine/crates/agent-kit/src/providers/gemini_live.rs b/voice/engine/crates/agent-kit/src/providers/gemini_live.rs index 188f03b..f4500b4 100644 --- a/voice/engine/crates/agent-kit/src/providers/gemini_live.rs +++ b/voice/engine/crates/agent-kit/src/providers/gemini_live.rs @@ -9,7 +9,7 @@ //! //! ```text //! WebRTC PCM → push_user_audio() → BidiGenerateContentRealtimeInput →──┐ -//! │ ws +//! │ ws //! WebRTC speaker ← RealtimeEvent::BotAudioChunk ← recv_event() ←───────┘ //! ↓ //! Also yields ToolCall, Transcription @@ -265,16 +265,6 @@ type WsStream = tokio_tungstenite::WebSocketStream< // ── Gemini Live turn-phase state machine ───────────────────────────────────── /// Compile-time state machine for the Gemini Live **output** turn lifecycle. -/// -/// Replaces the old `output_transcript_buf: String` field. By owning the -/// accumulation buffer *inside* the `BotSpeaking` variant, the compiler -/// makes the following bugs impossible: -/// -/// | Old bug | Enforcement | -/// |---------|-------------| -/// | `TurnComplete` after barge-in double-emits | `complete()` on `Listening` → `None`, no-op | -/// | Reconnect leaves open streaming bubble | `cancel()` forces caller to handle partial text | -/// | Stale chunk accumulated post-barge-in | buffer doesn't exist in `Listening` | #[derive(Debug, Default)] pub(crate) enum GeminiLivePhase { /// Idle — waiting for user input or between turns. @@ -974,7 +964,6 @@ impl RealtimeProvider for GeminiLiveProvider { // Active start (Gemini 3.x): // // After setupComplete, send a single-space `realtimeInput` to kick off inference. - // This is the minimal trigger used by pipecat for Gemini 3.x models. // `clientContent` causes "Invalid argument" when concurrent mic audio is // already streaming into the socket. if msg.setup_complete.is_some() && self.session_resumption_handle.is_none() { @@ -1041,6 +1030,27 @@ impl RealtimeProvider for GeminiLiveProvider { async fn interrupt(&mut self) -> Result<(), LlmProviderError> { self.trigger_vad(VadState::Started).await } + + /// Send a system/client text message to the backend model. + async fn push_client_content(&mut self, text: String) -> Result<(), LlmProviderError> { + let ws = self.ws.as_mut().ok_or_else(|| { + LlmProviderError::Transport("Not connected".to_string()) + })?; + + let msg = RealtimeInputMessage { + realtime_input: RealtimeInputPayload { + text: Some(text), + ..Default::default() + }, + }; + + let json = serde_json::to_string(&msg) + .map_err(|e| LlmProviderError::Provider(format!("text serialize: {e}")))?; + ws.send(Message::Text(json.into())) + .await + .map_err(|e| LlmProviderError::Transport(format!("text send: {e}")))?; + Ok(()) + } } diff --git a/voice/engine/crates/agent-kit/src/providers/realtime.rs b/voice/engine/crates/agent-kit/src/providers/realtime.rs index ec7c8df..7d49c52 100644 --- a/voice/engine/crates/agent-kit/src/providers/realtime.rs +++ b/voice/engine/crates/agent-kit/src/providers/realtime.rs @@ -83,4 +83,7 @@ pub trait RealtimeProvider: Send + Sync { /// Force a disruption/cut-off of the current bot generation (barge-in). async fn interrupt(&mut self) -> Result<(), LlmProviderError>; + + /// Send a text message to the model (ClientContent). + async fn push_client_content(&mut self, text: String) -> Result<(), LlmProviderError>; } diff --git a/voice/engine/crates/agent-kit/src/swarm.rs b/voice/engine/crates/agent-kit/src/swarm.rs index b03bab6..a31623e 100644 --- a/voice/engine/crates/agent-kit/src/swarm.rs +++ b/voice/engine/crates/agent-kit/src/swarm.rs @@ -107,6 +107,10 @@ pub const HANG_UP_TOOL_NAME: &str = "hang_up"; /// Name of the synthetic on-hold tool. pub const ON_HOLD_TOOL_NAME: &str = "on_hold"; +/// Name of the synthetic escalation/human-handoff tool. +/// Only injected for telephony sessions with configured destinations. +pub const ESCALATE_CALL_TOOL_NAME: &str = "escalate_call"; + // Re-export artifact tool names so callers don't need to depend on artifact_store directly. pub use crate::artifact_store::{LIST_ARTIFACTS_TOOL, READ_ARTIFACT_TOOL, SAVE_ARTIFACT_TOOL}; @@ -258,14 +262,75 @@ pub fn make_transfer_tool_schema(edges: &[String]) -> serde_json::Value { }) } +/// A pre-configured escalation destination available to the `escalate_call` tool. +/// +/// Mirrors `EscalationDestination` from the voice-engine types — redefined here +/// so `agent-kit` stays independent of the engine crate. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct EscalationDestination { + /// Human-readable label shown to the LLM (e.g. "Support Team"). + pub name: String, + /// E.164 phone number or SIP URI. + pub phone_number: String, +} + +/// Generate the `escalate_call` tool schema. +/// +/// Only injected for telephony sessions. `destinations` should contain at +/// least one entry; the LLM picks a target by the human-readable `name`. +pub fn make_escalate_call_tool_schema(destinations: &[EscalationDestination]) -> serde_json::Value { + let dest_names: Vec<&str> = destinations.iter().map(|d| d.name.as_str()).collect(); + let dest_list = dest_names.join(", "); + + json!({ + "type": "function", + "function": { + "name": ESCALATE_CALL_TOOL_NAME, + "description": format!( + r#"Transfer this call to a human agent or specialist. + +Call when: +- The caller explicitly requests to speak to a human. +- The issue cannot be resolved by the AI (e.g. complex complaints, exceptions, urgent situations). +- Your system instructions define specific escalation triggers. + +Available destinations: {dest_list} + +Do NOT call this to end the call — use hang_up for that. +This is irreversible once called: the call is immediately forwarded."# + ), + "parameters": { + "type": "object", + "properties": { + "destination_name": { + "type": "string", + "description": "Name of the escalation destination to forward the call to", + "enum": dest_names + }, + "reason": { + "type": "string", + "description": "Brief reason for the escalation (e.g. 'user_request', 'complex_issue')" + } + }, + "required": ["destination_name", "reason"] + } + } + }) +} + /// Build the complete tool schemas for a node: its own tools + transfer_to. /// /// Tool schemas are generated from the node's `tools` list, resolved /// against the graph's `tools` map. Unknown tool names are logged as warnings /// and skipped. +/// +/// When `is_telephony` is `true` and `escalation_destinations` is non-empty, +/// the `escalate_call` tool is also injected. pub fn build_node_tool_schemas( node: &NodeDef, graph_tools: &HashMap, + is_telephony: bool, + escalation_destinations: &[EscalationDestination], ) -> Vec { let mut schemas: Vec = node .tools @@ -293,6 +358,11 @@ pub fn build_node_tool_schemas( // Always add on_hold tool so the agent can pause idle detection schemas.push(make_on_hold_tool_schema()); + // Inject escalate_call for telephony sessions with configured destinations + if is_telephony && !escalation_destinations.is_empty() { + schemas.push(make_escalate_call_tool_schema(escalation_destinations)); + } + // Always add artifact tools so the agent can persist important information schemas.extend(make_artifact_tool_schemas()); @@ -460,8 +530,12 @@ mod tests { #[test] fn node_tool_schemas_include_transfer() { let graph = sample_graph(); - let schemas = - build_node_tool_schemas(&graph.nodes["receptionist"], &graph.tools); + let schemas = build_node_tool_schemas( + &graph.nodes["receptionist"], + &graph.tools, + false, + &[], + ); // Base tools + transfer_to + hang_up + on_hold + artifacts assert!( @@ -493,7 +567,12 @@ mod tests { }"#, ) .unwrap(); - let schemas = build_node_tool_schemas(&graph.nodes["solo"], &graph.tools); + let schemas = build_node_tool_schemas( + &graph.nodes["solo"], + &graph.tools, + false, + &[], + ); // hang_up + on_hold + artifacts (no transfer_to) assert!(schemas.len() >= 1, "Should have at least hang_up"); assert_eq!(schemas[0]["function"]["name"].as_str().unwrap(), "hang_up"); diff --git a/voice/engine/crates/transport/Cargo.toml b/voice/engine/crates/transport/Cargo.toml index c6885a5..9b5ae74 100644 --- a/voice/engine/crates/transport/Cargo.toml +++ b/voice/engine/crates/transport/Cargo.toml @@ -34,7 +34,7 @@ audio-codec-algorithms = { version = "0.7.0", optional = true } [features] default = [] webrtc = ["str0m", "opus", "uuid", "reqwest"] -telephony = ["base64", "reqwest", "dep:audio-codec-algorithms"] +telephony = ["base64", "reqwest", "dep:audio-codec-algorithms", "uuid"] [dev-dependencies] envy = "0.4.2" diff --git a/voice/engine/crates/transport/src/lib.rs b/voice/engine/crates/transport/src/lib.rs index 14f1040..8f872c8 100644 --- a/voice/engine/crates/transport/src/lib.rs +++ b/voice/engine/crates/transport/src/lib.rs @@ -44,6 +44,9 @@ pub struct TransportHandle { /// Transport lifecycle events (connected, disconnected, control messages). pub control_rx: mpsc::UnboundedReceiver, + /// Send events back into the session (e.g. injected webhook results). + pub event_tx: mpsc::UnboundedSender, + /// Send control messages back to the client. pub control_tx: mpsc::UnboundedSender, @@ -53,6 +56,11 @@ pub struct TransportHandle { /// Background tasks owned by this transport (event loop, forward loop, etc.). /// Aborted on drop so they don't leak. pub(crate) _background_tasks: Vec>, + + /// True if this transport is a PSTN telephony connection (e.g. Twilio, Telnyx). + /// Used by the session layer to selectively enable telephony-specific agent tools + /// like call escalation / human handoff. + pub is_telephony: bool, } impl TransportHandle { @@ -65,6 +73,11 @@ impl TransportHandle { std::mem::replace(&mut self.audio_rx, dummy_rx) } + pub fn take_control_rx(&mut self) -> mpsc::UnboundedReceiver { + let (_, dummy_rx) = mpsc::unbounded_channel(); + std::mem::replace(&mut self.control_rx, dummy_rx) + } + /// Create a no-op transport handle that owns no resources. /// /// Used when a session path takes ownership of the real transport @@ -82,14 +95,16 @@ impl TransportHandle { } let (_, audio_rx) = mpsc::unbounded_channel(); let (control_tx, _) = mpsc::unbounded_channel(); - let (_, control_rx) = mpsc::unbounded_channel(); + let (event_tx, control_rx) = mpsc::unbounded_channel(); Self { audio_rx, audio_tx: Box::new(NullSink), control_rx, + event_tx, control_tx, input_sample_rate: 48_000, _background_tasks: vec![], + is_telephony: false, } } } @@ -111,6 +126,14 @@ pub enum TransportEvent { Disconnected { reason: String }, /// Client sent a JSON control message (e.g. session config). ControlMessage(serde_json::Value), + /// Result of a telephony transfer attempt, reported by the provider + /// via an asynchronous status callback. + TransferResult { + succeeded: bool, + destination: String, + /// Provider error code / reason when `!succeeded` + reason: Option, + }, } /// Commands from the session to the transport. @@ -120,6 +143,11 @@ pub enum TransportCommand { SendMessage(serde_json::Value), /// Signal the transport to close. Close, + /// Forward the active telephony call to a new destination (PSTN number or SIP URI). + /// + /// Only meaningful for telephony transports (Twilio/Telnyx). WebSocket and WebRTC + /// transports treat this as a no-op. + Transfer { destination: String }, /// Add a remote ICE candidate. #[cfg(feature = "webrtc")] AddIceCandidate(String), diff --git a/voice/engine/crates/transport/src/telephony/config.rs b/voice/engine/crates/transport/src/telephony/config.rs index a79164e..31f07ef 100644 --- a/voice/engine/crates/transport/src/telephony/config.rs +++ b/voice/engine/crates/transport/src/telephony/config.rs @@ -24,6 +24,8 @@ pub enum TelephonyCredentials { Telnyx { #[serde(default)] api_key: String, + #[serde(default)] + connection_id: String, }, } @@ -117,8 +119,6 @@ pub struct TelephonyConfig { #[serde(default)] pub outbound_encoding: TelephonyEncoding, - - /// Whether to automatically hang up the call when the session ends. #[serde(default = "default_auto_hang_up")] pub auto_hang_up: bool, @@ -126,6 +126,19 @@ pub struct TelephonyConfig { /// Telephony sample rate. Always 8000 Hz for PSTN. #[serde(default = "default_telephony_sample_rate")] pub sample_rate: u32, + + /// Publicly reachable URL for incoming webhooks (e.g. transfer status callbacks). + #[serde(default)] + pub public_url: Option, + + /// The provider phone number that placed / answered this call. + /// + /// Required for supervised transfer: when the AI initiates a transfer, we place + /// a *new* outbound call from this number to the destination. The destination + /// is then put into a Twilio/Telnyx Conference room; once they answer, we move + /// the original caller into the same room and terminate the AI leg. + #[serde(default)] + pub from_number: Option, } fn default_auto_hang_up() -> bool { @@ -146,10 +159,13 @@ impl Default for TelephonyConfig { outbound_encoding: TelephonyEncoding::default(), auto_hang_up: true, sample_rate: TELEPHONY_SAMPLE_RATE, + public_url: None, + from_number: None, } } } + impl TelephonyConfig { /// Get the current stream ID (locks briefly). pub fn get_stream_id(&self) -> String { diff --git a/voice/engine/crates/transport/src/telephony/providers/mod.rs b/voice/engine/crates/transport/src/telephony/providers/mod.rs index 12b1a6c..9977e6c 100644 --- a/voice/engine/crates/transport/src/telephony/providers/mod.rs +++ b/voice/engine/crates/transport/src/telephony/providers/mod.rs @@ -15,10 +15,15 @@ use super::config::{TelephonyConfig, TelephonyCredentials}; /// Trait that each telephony provider (Twilio, Telnyx, etc.) implements. /// -/// Covers three concerns: +/// Covers four concerns: /// 1. **JSON framing** — how to build outbound media/clear frames /// 2. **Event parsing** — how to extract stream/call IDs from the `start` event -/// 3. **Call control** — how to hang up via REST API +/// 3. **Unsupervised call control** — hang up +/// 4. **Supervised transfer** — place an outbound call to `destination`; +/// when they answer, put them into a named Conference room. +/// The original caller is then bridged into the same room by the webhook +/// handler (`ServerState::telephony_transfer_status_handler`), and our +/// WebSocket leg is terminated cleanly *after* the bridge is complete. #[async_trait] pub trait TelephonyProviderImpl: Send + Sync { /// Human-readable provider name (for logging). @@ -48,8 +53,47 @@ pub trait TelephonyProviderImpl: Send + Sync { .map(String::from) } - /// Hang up the call via the provider's REST API. + /// Hang up the call unconditionally via the provider's REST API. async fn hangup(&self, config: &TelephonyConfig, call_id: &str) -> Result<(), TransportError>; + + /// **Conference-based supervised transfer** — the correct approach for keeping + /// the AI session alive while the destination rings. + /// + /// # Protocol + /// 1. Place a *new* outbound call from `from_number` to `destination`. + /// 2. Instruct the provider: once the destination answers, join them into + /// a Conference named `conference_name`. + /// 3. Register `transfer_callback_url` as the StatusCallback for the + /// outbound leg so we can learn when the destination answered or failed. + /// 4. The original call (`original_call_id`) is **not touched** here — the AI + /// session's WebSocket stays alive. + /// 5. When the callback fires with `answered`, the webhook handler moves + /// the original caller into the Conference and sends `TransferResult::succeeded`. + /// 6. If the callback fires with `busy`/`no-answer`/`failed`, the webhook + /// sends `TransferResult::failed` and the AI resumes the conversation. + /// + /// Returns the provider-assigned SID of the newly placed outbound call leg. + async fn initiate_supervised_transfer( + &self, + config: &TelephonyConfig, + original_call_id: &str, + destination: &str, + from_number: &str, + conference_name: &str, + transfer_callback_url: &str, + ) -> Result; + + /// Move `call_id` into a named Conference room. + /// + /// Used by the webhook handler after the destination has answered: this + /// replaces the original caller's active TwiML with a `` verb + /// so they are bridged to the destination who is already in the room. + async fn bridge_call_to_conference( + &self, + config: &TelephonyConfig, + call_id: &str, + conference_id_or_name: &str, + ) -> Result; } /// Create a boxed provider implementation from the enum variant. diff --git a/voice/engine/crates/transport/src/telephony/providers/telnyx.rs b/voice/engine/crates/transport/src/telephony/providers/telnyx.rs index 5f9f920..fce3d9d 100644 --- a/voice/engine/crates/transport/src/telephony/providers/telnyx.rs +++ b/voice/engine/crates/transport/src/telephony/providers/telnyx.rs @@ -3,6 +3,7 @@ //! Protocol reference: https://developers.telnyx.com/docs/voice/media-streaming use async_trait::async_trait; +use base64::Engine; use tracing::{error, info, warn}; use crate::error::TransportError; @@ -63,7 +64,7 @@ impl TelephonyProviderImpl for Telnyx { call_id: &str, ) -> Result<(), TransportError> { let api_key = match &config.credentials { - super::super::config::TelephonyCredentials::Telnyx { api_key } => api_key.as_str(), + super::super::config::TelephonyCredentials::Telnyx { api_key, .. } => api_key.as_str(), _ => return Err(TransportError::SendFailed("Invalid credentials for Telnyx provider".into())), }; @@ -103,4 +104,223 @@ impl TelephonyProviderImpl for Telnyx { } } } + + /// Conference-based supervised transfer using Telnyx Call Control API. + /// + /// Telnyx does not support inline TwiML for outbound calls; instead we: + /// 1. Create a new outbound call via `POST /v2/calls` with a `webhook_url` + /// pointing at `transfer_callback_url`. + /// 2. Once the outbound call answers (Telnyx sends `call.answered` webhook), + /// the webhook handler calls `conference.join` on the Telnyx Call Control API + /// for both legs. + /// + /// Returns the `call_control_id` of the outbound leg. + async fn initiate_supervised_transfer( + &self, + config: &super::super::config::TelephonyConfig, + original_call_id: &str, + destination: &str, + from_number: &str, + conference_name: &str, + transfer_callback_url: &str, + ) -> Result { + let (api_key, connection_id) = match &config.credentials { + super::super::config::TelephonyCredentials::Telnyx { api_key, connection_id } => { + (api_key.as_str(), connection_id.as_str()) + } + _ => return Err(TransportError::SendFailed("Invalid credentials for Telnyx provider".into())), + }; + + let client_state = base64::engine::general_purpose::STANDARD.encode( + serde_json::to_string(&serde_json::json!({ + "original_call_id": original_call_id, + "conference_name": conference_name, + })).map_err(|e| TransportError::SendFailed(e.to_string()))? + ); + + let payload = serde_json::json!({ + "connection_id": connection_id, + "to": destination, + "from": from_number, + "webhook_url": transfer_callback_url, + "webhook_url_method": "POST", + "client_state": client_state, + }); + + let client = reqwest::Client::new(); + let resp = client + .post("https://api.telnyx.com/v2/calls") + .header("Authorization", format!("Bearer {}", api_key)) + .header("Content-Type", "application/json") + .body(payload.to_string()) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Telnyx transfer outbound call failed: {}", e)) + })?; + + let status = resp.status().as_u16(); + let body = resp.text().await.unwrap_or_default(); + + match status { + 200 | 201 => { + let json: serde_json::Value = serde_json::from_str(&body).unwrap_or_default(); + let new_call_id = json + .pointer("/data/call_control_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + info!( + "[telnyx] Supervised transfer initiated: original={} → destination={}, conference={}, new_call_id={}", + original_call_id, destination, conference_name, new_call_id + ); + Ok(new_call_id) + } + _ => { + error!( + "[telnyx] Failed to initiate transfer outbound call: status={}, body={}", + status, body + ); + Err(TransportError::SendFailed(format!( + "Telnyx transfer call failed: status={}", + status + ))) + } + } + } + + /// Join the given call_control_id into a Telnyx Conference room. + /// + /// Accepts either a conference UUID (bypasses list lookup, immune to eventual + /// consistency) or a conference name (falls back to filter[name] search and + /// then creates the conference if absent). Returns the authoritative UUID. + async fn bridge_call_to_conference( + &self, + config: &super::super::config::TelephonyConfig, + call_id: &str, + conference_id_or_name: &str, + ) -> Result { + let api_key = match &config.credentials { + super::super::config::TelephonyCredentials::Telnyx { api_key, .. } => api_key.as_str(), + _ => return Err(TransportError::SendFailed("Invalid credentials for Telnyx provider".into())), + }; + + let client = reqwest::Client::new(); + + let is_uuid = uuid::Uuid::parse_str(conference_id_or_name).is_ok(); + let mut existing_conf_id = None; + + if is_uuid { + info!( + "[telnyx] Provided conference identifier {} is a UUID, bypassing list search", + conference_id_or_name + ); + existing_conf_id = Some(conference_id_or_name.to_string()); + } else { + // Step 1: Check if the conference already exists by name. + let filter_url = format!("https://api.telnyx.com/v2/conferences?filter[name]={}", conference_id_or_name); + let list_resp = client + .get(&filter_url) + .header("Authorization", format!("Bearer {}", api_key)) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Telnyx list conferences failed: {}", e)) + })?; + + let list_status = list_resp.status().as_u16(); + let list_body = list_resp.text().await.unwrap_or_default(); + + if matches!(list_status, 200 | 201) { + let json: serde_json::Value = serde_json::from_str(&list_body).unwrap_or_default(); + if let Some(data) = json.pointer("/data").and_then(|v| v.as_array()) { + if !data.is_empty() { + existing_conf_id = data[0].pointer("/id").and_then(|v| v.as_str()).map(|s| s.to_string()); + } + } + } + } + + if let Some(conf_id) = existing_conf_id { + // Step 2a: Conference exists. Join the call leg into the conference. + let join_endpoint = format!( + "https://api.telnyx.com/v2/conferences/{}/actions/join", + conf_id + ); + let join_payload = serde_json::json!({"call_control_id": call_id}); + let join_resp = client + .post(&join_endpoint) + .header("Authorization", format!("Bearer {}", api_key)) + .header("Content-Type", "application/json") + .body(join_payload.to_string()) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Telnyx conference join failed: {}", e)) + })?; + + match join_resp.status().as_u16() { + 200 | 201 => { + info!( + "[telnyx] Bridged call {} into existing conference {} (id={})", + call_id, conference_id_or_name, conf_id + ); + Ok(conf_id) + } + status => { + let body = join_resp.text().await.unwrap_or_default(); + error!( + "[telnyx] Failed to bridge call {} into conference {}: status={}, body={}", + call_id, conference_id_or_name, status, body + ); + Err(TransportError::SendFailed(format!( + "Telnyx conference join failed: status={}", + status + ))) + } + } + } else { + // Step 2b: Conference doesn't exist. Create it with our call_control_id to automatically join on creation. + // A Telnyx conference MUST be created with a valid call_control_id. + let conf_payload = serde_json::json!({ + "name": conference_id_or_name, + "call_control_id": call_id + }); + let conf_resp = client + .post("https://api.telnyx.com/v2/conferences") + .header("Authorization", format!("Bearer {}", api_key)) + .header("Content-Type", "application/json") + .body(conf_payload.to_string()) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Telnyx create conference failed: {}", e)) + })?; + + let status = conf_resp.status().as_u16(); + let conf_body = conf_resp.text().await.unwrap_or_default(); + + if matches!(status, 200 | 201) { + let json: serde_json::Value = serde_json::from_str(&conf_body).unwrap_or_default(); + let conf_id = json.pointer("/data/id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + info!( + "[telnyx] Created conference {} (id={}) and joined initial call {}", + conference_id_or_name, conf_id, call_id + ); + Ok(conf_id) + } else { + error!( + "[telnyx] Failed to create conference {} with call {}: status={}, body={}", + conference_id_or_name, call_id, status, conf_body + ); + Err(TransportError::SendFailed(format!( + "Telnyx conference creation failed: status={}", + status + ))) + } + } + } } diff --git a/voice/engine/crates/transport/src/telephony/providers/twilio.rs b/voice/engine/crates/transport/src/telephony/providers/twilio.rs index a5baeee..609dbe8 100644 --- a/voice/engine/crates/transport/src/telephony/providers/twilio.rs +++ b/voice/engine/crates/transport/src/telephony/providers/twilio.rs @@ -100,4 +100,182 @@ impl TelephonyProviderImpl for Twilio { } } } + + /// Place a new outbound call to `destination`. + /// + /// When the destination answers, Twilio automatically puts them into + /// the Conference named `conference_name` (via inline TwiML). + /// The `transfer_callback_url` receives StatusCallback events so the + /// webhook handler can react to answered/busy/no-answer/failed outcomes. + /// + /// The original caller's WebSocket leg is **not touched** here — the AI + /// session continues uninterrupted while the outbound leg is ringing. + async fn initiate_supervised_transfer( + &self, + config: &super::super::config::TelephonyConfig, + original_call_id: &str, + destination: &str, + from_number: &str, + conference_name: &str, + transfer_callback_url: &str, + ) -> Result { + let (account_sid, auth_token) = match &config.credentials { + super::super::config::TelephonyCredentials::Twilio { + account_sid, + auth_token, + } => (account_sid.as_str(), auth_token.as_str()), + _ => return Err(TransportError::SendFailed("Invalid credentials for Twilio provider".into())), + }; + + let mut callback_url = transfer_callback_url.to_string(); + if !callback_url.contains("original_call_id=") { + let sep = if callback_url.contains('?') { "&" } else { "?" }; + callback_url.push_str(&format!("{}original_call_id={}", sep, original_call_id)); + } + + let twiml = format!( + r#" + + + {} + +"#, + conference_name + ); + + let endpoint = format!( + "https://api.twilio.com/2010-04-01/Accounts/{}/Calls.json", + account_sid + ); + + let client = reqwest::Client::new(); + let mut form_data = vec![ + ("To", destination), + ("From", from_number), + ("Twiml", twiml.as_str()), + ("Timeout", "30"), + ("StatusCallback", transfer_callback_url), + ("StatusCallbackMethod", "POST"), + ]; + form_data.push(("StatusCallbackEvent", "initiated")); + form_data.push(("StatusCallbackEvent", "ringing")); + form_data.push(("StatusCallbackEvent", "answered")); + form_data.push(("StatusCallbackEvent", "completed")); + + let resp = client + .post(&endpoint) + .basic_auth(account_sid, Some(auth_token)) + .form(&form_data) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Twilio transfer outbound call failed: {}", e)) + })?; + + let status = resp.status().as_u16(); + let body = resp.text().await.unwrap_or_default(); + + match status { + 200 | 201 => { + // Extract the new call SID from the response JSON. + let json: serde_json::Value = serde_json::from_str(&body).unwrap_or_default(); + let new_call_sid = json + .get("sid") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + info!( + "[twilio] Supervised transfer initiated: original={} → destination={}, conference={}, new_call_sid={}", + original_call_id, destination, conference_name, new_call_sid + ); + Ok(new_call_sid) + } + _ => { + error!( + "[twilio] Failed to initiate transfer outbound call: status={}, body={}", + status, body + ); + Err(TransportError::SendFailed(format!( + "Twilio transfer call failed: status={}", + status + ))) + } + } + } + + /// Move the original caller's active call into the named Conference room. + /// + /// Called by the webhook handler once the destination has answered and + /// is already waiting in the Conference. We update the original call's + /// TwiML to `` — this drops the caller into the room + /// without any interruption on their end (no ring, no hold music gap). + async fn bridge_call_to_conference( + &self, + config: &super::super::config::TelephonyConfig, + call_id: &str, + conference_id_or_name: &str, + ) -> Result { + let (account_sid, auth_token) = match &config.credentials { + super::super::config::TelephonyCredentials::Twilio { + account_sid, + auth_token, + } => (account_sid.as_str(), auth_token.as_str()), + _ => { + return Err(TransportError::SendFailed( + "Invalid credentials for Twilio provider".into(), + )) + } + }; + + // Twilio bridge is simple: we tell one leg to immediately execute new TwiML + // which drops them into the named conference. + let twiml = format!( + "{}", + conference_id_or_name + ); + + let endpoint = format!( + "https://api.twilio.com/2010-04-01/Accounts/{}/Calls/{}.json", + account_sid, call_id + ); + + let client = reqwest::Client::new(); + let resp = client + .post(&endpoint) + .basic_auth(account_sid, Some(auth_token)) + .form(&[("Twiml", twiml.as_str())]) + .send() + .await + .map_err(|e| { + TransportError::SendFailed(format!("Twilio conference bridge failed: {}", e)) + })?; + + match resp.status().as_u16() { + 200 | 201 => { + info!( + "[twilio] Bridged call {} into conference {}", + call_id, conference_id_or_name + ); + Ok(conference_id_or_name.to_string()) + } + 404 => { + warn!("[twilio] Call {} not found for conference bridge (404)", call_id); + Err(TransportError::SendFailed(format!( + "Twilio conference bridge failed: Call {} not found (404)", + call_id + ))) + } + status => { + let body = resp.text().await.unwrap_or_default(); + error!( + "[twilio] Failed to bridge call {} into conference {}: status={}, body={}", + call_id, conference_id_or_name, status, body + ); + Err(TransportError::SendFailed(format!( + "Twilio conference bridge failed: status={}", + status + ))) + } + } + } } diff --git a/voice/engine/crates/transport/src/telephony/transport.rs b/voice/engine/crates/transport/src/telephony/transport.rs index fe61357..2e4f06f 100644 --- a/voice/engine/crates/transport/src/telephony/transport.rs +++ b/voice/engine/crates/transport/src/telephony/transport.rs @@ -72,7 +72,7 @@ impl TelephonyTransport { socket: WebSocket, config: TelephonyConfig, bus_sender_rx: oneshot::Receiver>, - ) -> (TransportHandle, oneshot::Receiver>) { + ) -> (TransportHandle, oneshot::Receiver<(Option, Option)>) { let (ws_sink, ws_stream) = socket.split(); let (audio_in_tx, audio_rx) = mpsc::unbounded_channel::(); let (control_event_tx, control_rx) = mpsc::unbounded_channel(); @@ -110,10 +110,11 @@ impl TelephonyTransport { fwd_provider, )); - // ── Task 3: Handle control commands (hangup on close) ────── + // ── Task 3: Handle control commands (hangup on close, supervised transfer) ── let cmd_config = Arc::clone(&config); let cmd_provider = Arc::clone(&provider); - let cmd_handle = tokio::spawn(telephony_cmd_loop(control_cmd_rx, cmd_config, cmd_provider)); + let cmd_event_tx = control_event_tx.clone(); + let cmd_handle = tokio::spawn(telephony_cmd_loop(control_cmd_rx, cmd_config, cmd_provider, cmd_event_tx)); // ── Audio sink (sends encoded G.711 as JSON WS frames) ───── let audio_sink = TelephonyAudioSink { @@ -126,9 +127,11 @@ impl TelephonyTransport { audio_rx, audio_tx: Box::new(audio_sink), control_rx, + event_tx: control_event_tx.clone(), control_tx: control_cmd_tx, input_sample_rate: config.sample_rate, _background_tasks: vec![recv_handle, forward_handle, cmd_handle], + is_telephony: true, }; (handle, session_id_rx) @@ -149,7 +152,7 @@ async fn telephony_recv_loop( control_tx: mpsc::UnboundedSender, config: Arc, provider: Arc>, - session_id_tx: oneshot::Sender>, + session_id_tx: oneshot::Sender<(Option, Option)>, ) { use base64::Engine; let b64 = base64::engine::general_purpose::STANDARD; @@ -202,9 +205,9 @@ async fn telephony_recv_loop( custom_session_id ); - // Send session_id from customParameters to the server (once) + // Send session_id and call_id from start event to the server (once) if let Some(tx) = session_id_tx.take() { - let _ = tx.send(custom_session_id); + let _ = tx.send((custom_session_id, call_id.clone())); } // Store the provider-assigned IDs so outbound frames can use them @@ -384,6 +387,7 @@ async fn telephony_cmd_loop( mut cmd_rx: mpsc::UnboundedReceiver, config: Arc, provider: Arc>, + control_tx: mpsc::UnboundedSender, ) { while let Some(cmd) = cmd_rx.recv().await { match cmd { @@ -429,6 +433,106 @@ async fn telephony_cmd_loop( TransportCommand::SendMessage(msg) => { info!("[telephony:{}] Control message: {:?}", provider.name(), msg); } + TransportCommand::Transfer { destination } => { + info!("[telephony:{}] Supervised transfer command received → {}", provider.name(), destination); + + let call_id = config.get_call_id(); + let from_number = config.from_number.clone(); + let public_url = config.public_url.clone(); + + match (call_id, from_number, public_url) { + (Some(call_id), Some(from_number), Some(public_url)) => { + // Build a unique conference name from the original call SID. + let conference_name = format!("feros-transfer-{}", call_id); + // Transfer ID is the original call SID — used as the lookup key in transfer_waiters. + // The original call_id is used as the transfer_waiters lookup key. + let callback_url = format!( + "{}/telephony/{}/transfer-status", + public_url.trim_end_matches('/'), + provider.name().to_lowercase() + ); + + info!( + "[telephony:{}] Initiating supervised transfer: call_id={}, from={}, conference={}, callback={}", + provider.name(), call_id, from_number, conference_name, callback_url + ); + + const MAX_ATTEMPTS: u32 = 3; + let mut transfer_initiated = false; + for attempt in 1..=MAX_ATTEMPTS { + match provider + .initiate_supervised_transfer( + &config, + &call_id, + &destination, + &from_number, + &conference_name, + &callback_url, + ) + .await + { + Ok(new_call_sid) => { + info!( + "[telephony:{}] Outbound transfer leg created: {}", + provider.name(), + new_call_sid + ); + transfer_initiated = true; + break; + } + Err(e) => { + if attempt < MAX_ATTEMPTS { + warn!( + "[telephony:{}] Transfer attempt {}/{} failed: {} — retrying", + provider.name(), attempt, MAX_ATTEMPTS, e + ); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + warn!( + "[telephony:{}] Transfer failed after {} attempts: {}", + provider.name(), attempt, e + ); + } + } + } + } + + if !transfer_initiated { + // Emit a failure event so the AI can tell the user. + let _ = control_tx.send(crate::TransportEvent::TransferResult { + succeeded: false, + destination: destination.clone(), + reason: Some("Failed to place outbound transfer call".into()), + }); + } + // Do NOT break — we wait for TransferResult from the webhook or a Close command. + } + (None, _, _) => { + warn!("[telephony:{}] Transfer requested but no call_id available", provider.name()); + let _ = control_tx.send(crate::TransportEvent::TransferResult { + succeeded: false, + destination, + reason: Some("No call_id available for transfer".into()), + }); + } + (_, None, _) => { + warn!("[telephony:{}] Transfer requested but no from_number configured — cannot place outbound call", provider.name()); + let _ = control_tx.send(crate::TransportEvent::TransferResult { + succeeded: false, + destination, + reason: Some("No from_number configured".into()), + }); + } + (_, _, None) => { + warn!("[telephony:{}] Transfer requested but no public_url configured — cannot build callback URL", provider.name()); + let _ = control_tx.send(crate::TransportEvent::TransferResult { + succeeded: false, + destination, + reason: Some("No public_url configured".into()), + }); + } + } + } #[cfg(feature = "webrtc")] TransportCommand::AddIceCandidate(_) => { // No-op for telephony. diff --git a/voice/engine/crates/transport/src/webrtc/connection.rs b/voice/engine/crates/transport/src/webrtc/connection.rs index fef67c9..ac114c9 100644 --- a/voice/engine/crates/transport/src/webrtc/connection.rs +++ b/voice/engine/crates/transport/src/webrtc/connection.rs @@ -53,6 +53,8 @@ pub struct WebRtcConnection { pub(crate) audio_rx: Option>, /// Channel for transport lifecycle events. pub(crate) control_rx: Option>, + /// Channel for sending events back to the session. + pub(crate) event_tx: Option>, /// Channel for sending commands to the connection loop. pub(crate) control_tx: Option>, /// Channel for sending PCM audio to the str0m event loop for Opus encoding. @@ -183,6 +185,7 @@ impl WebRtcConnection { let (audio_out_tx, audio_out_rx) = mpsc::unbounded_channel(); let id_for_task = id.clone(); + let control_event_tx_task = control_event_tx.clone(); // Spawn the str0m event loop as a tokio task let task_handle = tokio::spawn(async move { @@ -192,7 +195,7 @@ impl WebRtcConnection { socket, host_addr, audio_tx, - control_event_tx, + control_event_tx_task, control_cmd_rx, audio_out_rx, ) @@ -206,6 +209,7 @@ impl WebRtcConnection { id, audio_rx: Some(audio_rx), control_rx: Some(control_rx), + event_tx: Some(control_event_tx.clone()), control_tx: Some(control_cmd_tx), audio_out_tx: Some(audio_out_tx), task_handle: Some(task_handle), @@ -402,6 +406,9 @@ async fn run_rtc_loop( Err(e) => warn!("[webrtc:{tag}] Failed to parse ICE candidate {}: {}", candidate, e) } } + Some(TransportCommand::Transfer { destination }) => { + warn!("[webrtc:{tag}] Transfer command unsupported in WebRTC (destination: {})", destination); + } None => { info!("[webrtc:{}] Control channel closed", tag); return Ok(()); diff --git a/voice/engine/crates/transport/src/webrtc/transport.rs b/voice/engine/crates/transport/src/webrtc/transport.rs index e036706..805c9ba 100644 --- a/voice/engine/crates/transport/src/webrtc/transport.rs +++ b/voice/engine/crates/transport/src/webrtc/transport.rs @@ -49,6 +49,11 @@ impl WebRtcTransport { .control_tx .take() .expect("control_tx already consumed"); + let event_tx = connection + .event_tx + .take() + .expect("event_tx already consumed"); + let audio_out_tx = connection .audio_out_tx .take() @@ -79,9 +84,11 @@ impl WebRtcTransport { audio_rx, audio_tx: Box::new(audio_sink), control_rx, + event_tx, control_tx, input_sample_rate: OPUS_SAMPLE_RATE, _background_tasks: tasks, + is_telephony: false, } } } diff --git a/voice/engine/crates/transport/src/websocket.rs b/voice/engine/crates/transport/src/websocket.rs index 37194cb..37de073 100644 --- a/voice/engine/crates/transport/src/websocket.rs +++ b/voice/engine/crates/transport/src/websocket.rs @@ -57,9 +57,11 @@ impl WebSocketTransport { audio_rx, audio_tx: Box::new(audio_sink), control_rx, + event_tx: control_event_tx.clone(), control_tx: control_cmd_tx, input_sample_rate, _background_tasks: vec![recv_handle, forward_handle], + is_telephony: false, } } } diff --git a/voice/engine/src/main.rs b/voice/engine/src/main.rs index 05889ae..9110555 100644 --- a/voice/engine/src/main.rs +++ b/voice/engine/src/main.rs @@ -44,6 +44,7 @@ async fn main() { twilio_account_sid: settings.twilio_account_sid, twilio_auth_token: settings.twilio_auth_token, telnyx_api_key: settings.telnyx_api_key, + telnyx_connection_id: settings.telnyx_connection_id, }; let auth_secret_key = std::env::var("AUTH__SECRET_KEY").unwrap_or_default(); let state = ServerState::new(providers, telephony, auth_secret_key); diff --git a/voice/engine/src/native_session.rs b/voice/engine/src/native_session.rs index 7411e9a..13f64f1 100644 --- a/voice/engine/src/native_session.rs +++ b/voice/engine/src/native_session.rs @@ -5,7 +5,7 @@ use agent_kit::agent_backends::native::{NativeAgentEvent, NativeMultimodalBackend}; use agent_kit::providers::gemini_live::{GeminiLiveProvider, OUTPUT_SAMPLE_RATE}; -use agent_kit::swarm::AgentGraphDef; +use agent_kit::swarm::{AgentGraphDef, EscalationDestination}; use agent_kit::AgentBackend as _; use agent_kit::AgentBackendConfig; use bytes::Bytes; @@ -17,14 +17,11 @@ use voice_trace::{Event, Tracer}; use voice_transport::TransportHandle; use crate::audio_ml::vad::{VadConfig, VAD_THRESHOLD_IDLE, VAD_THRESHOLD_PLAYBACK_RAW}; -use crate::reactor::AgentAudioCursor; use crate::reactor::proc::vad::VadStage; +use crate::reactor::AgentAudioCursor; use crate::session::NativeMultimodalConfig; use crate::types::VadEvent; -use crate::utils::{AudioRingBuffer, PlaybackTracker, SAMPLE_RATE}; - -/// WebRTC Opus clock rate. -const WEBRTC_RATE: u32 = 48_000; +use crate::utils::{AudioRingBuffer, PlaybackTracker, SAMPLE_RATE, WEBRTC_RATE}; /// Self-contained event loop for Gemini Live native audio sessions. /// @@ -50,13 +47,14 @@ pub(crate) async fn run_native_multimodal( voice_id: String, backend_config: AgentBackendConfig, mut mic_rx: UnboundedReceiver, - transport: TransportHandle, + mut transport: TransportHandle, mut tracer: Tracer, input_sample_rate: u32, models_dir: String, recording_enabled: bool, language: String, greeting: Option, + escalation_destinations: Vec, ) { tracer.emit(Event::SessionReady); @@ -88,7 +86,8 @@ pub(crate) async fn run_native_multimodal( agent_graph.as_ref(), backend_config, voice_id.clone(), - ); + ) + .with_telephony_escalation(transport.is_telephony, escalation_destinations); let mut final_system_prompt = system_prompt; if let Some(mut greet) = greeting { greet = greet.trim().to_string(); @@ -114,9 +113,8 @@ pub(crate) async fn run_native_multimodal( // ── Resamplers ───────────────────────────────────────────────── // Input: client rate (e.g. 48kHz) → 16kHz (Gemini input requirement). - let mut in_resampler = - SoxrStreamResampler::new(input_sample_rate, SAMPLE_RATE) - .expect("Native in-resampler creation failed"); + let mut in_resampler = SoxrStreamResampler::new(input_sample_rate, SAMPLE_RATE) + .expect("Native in-resampler creation failed"); // Output: Gemini 24kHz → WebRTC 48kHz. let mut out_resampler = SoxrStreamResampler::new(OUTPUT_SAMPLE_RATE, WEBRTC_RATE) @@ -124,10 +122,7 @@ pub(crate) async fn run_native_multimodal( // ── Local VAD for barge-in ───────────────────────────────────── let vad_path = format!("{}/silero_vad/silero_vad.onnx", models_dir); - let mut vad = VadStage::new( - &vad_path, - VadConfig::default(), - ); + let mut vad = VadStage::new(&vad_path, VadConfig::default()); let vad_ok = vad.initialize().is_ok(); if !vad_ok { warn!("[native] VAD init failed — barge-in disabled"); @@ -139,9 +134,51 @@ pub(crate) async fn run_native_multimodal( let mut hangup_target: Option = None; let mut hangup_max_target: Option = None; + let mut holding = false; + let mut hold_phase: f32 = 0.0; + // ── Main event loop ──────────────────────────────────────────── loop { tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_millis(20)), if holding => { + let hold_rate = if transport.is_telephony { 8000 } else { WEBRTC_RATE }; + let pcm_bytes = crate::utils::generate_transfer_hold_tone(&mut hold_phase, hold_rate); + + let offset = tts_cursor.stamp(pcm_bytes.len()); + tracer.emit(Event::AgentAudio { + pcm: Bytes::from(pcm_bytes), + sample_rate: hold_rate, + offset_samples: offset, + }); + } + + ctrl_event = transport.control_rx.recv() => { + match ctrl_event { + Some(voice_transport::TransportEvent::TransferResult { succeeded, destination, reason }) => { + info!("[native] Transfer complete: succeeded={}, destination={:?}, reason={:?}", succeeded, destination, reason); + if succeeded { + info!("[native] Call transferred successfully. Hanging up."); + let _ = transport.control_tx.send(voice_transport::TransportCommand::Close); + break; + } else { + holding = false; + let fail_reason = reason.unwrap_or_else(|| "Unknown Provider Error".into()); + if let Err(e) = backend.push_transfer_failure_result(destination, fail_reason).await { + warn!("[native] Failed to send transfer failure to Gemini: {}", e); + } + } + } + Some(voice_transport::TransportEvent::Disconnected { reason }) => { + info!("[native] Transport disconnected: {}", reason); + break; + } + Some(_) => {} + None => { + info!("[native] Transport control channel closed. Terminating."); + break; + } + } + } _ = async { if let Some(target) = hangup_target { tokio::time::sleep_until(target).await; @@ -211,9 +248,11 @@ pub(crate) async fn run_native_multimodal( pending_pcm.push(samples); }); - for samples in pending_pcm { - if let Err(e) = backend.push_audio(&samples).await { - warn!("[native] push_audio error: {}", e); + if !holding { + for samples in pending_pcm { + if let Err(e) = backend.push_audio(&samples).await { + warn!("[native] push_audio error: {}", e); + } } } @@ -274,10 +313,17 @@ pub(crate) async fn run_native_multimodal( // Gemini events: audio out, transcripts, tool calls. event = backend.recv() => { match event { - Some(ev) => match ev { - NativeAgentEvent::BotAudio(samples) => { - if !bot_speaking { - bot_speaking = true; + Some(ev) => { + // Once we are holding (escalated), ignore all further backend events + // to prevent stray LLM audio, tool calls, or duplicate escalate_call attempts + // while the telephony provider routes the transfer. + if holding { + continue; + } + match ev { + NativeAgentEvent::BotAudio(samples) => { + if !bot_speaking { + bot_speaking = true; // Snap the cursor to wall-clock on the first chunk of each // new turn. This encodes the real inter-turn gap (user speech // + STT + LLM + TTS TTFB) so recording is wall-clock accurate. @@ -457,6 +503,35 @@ pub(crate) async fn run_native_multimodal( } } + NativeAgentEvent::EscalateCall { destination, reason } => { + info!("[native] Agent escalate_call (destination={}, reason={}) intercepted. Commencing transfer...", destination, reason); + + // Send the Transfer command to the transport layer. + // The transport is responsible for handling the SIP/PSTN mechanics. + let _ = transport.control_tx.send(voice_transport::TransportCommand::Transfer { + destination: destination.clone() + }); + + // Suppress all further backend events (audio, tool calls, etc.) + // while the telephony provider routes the transfer. + // NOTE: Do NOT call backend.interrupt() here. Gemini has just + // received the escalate_call tool result and is in the middle of + // generating a farewell audio turn. Sending activityStart races + // with that turn generation and causes Gemini to emit an empty + // response ("model output must contain either output text or tool + // calls"). The holding guard below is sufficient to drain and + // discard all further events without disrupting the provider. + holding = true; + bot_speaking = false; + + tracer.emit(Event::ToolActivity { + tool_call_id: None, + tool_name: "escalate_call".into(), + status: "completed".into(), + error_message: None, + }); + } + NativeAgentEvent::Error(msg) => { warn!("[native] Provider error: {}", msg); tracer.emit(Event::Error { @@ -465,6 +540,7 @@ pub(crate) async fn run_native_multimodal( }); } } + } None => { // Stream ended — attempt reconnect with exponential backoff. info!("[native] Provider stream ended — reconnecting"); diff --git a/voice/engine/src/python.rs b/voice/engine/src/python.rs index 50a7b2a..5444722 100644 --- a/voice/engine/src/python.rs +++ b/voice/engine/src/python.rs @@ -63,6 +63,7 @@ impl PySessionConfig { min_barge_in_words = 2, barge_in_timeout_ms = 800, graph_json = None, + escalation_destinations_json = None, ))] #[allow(clippy::too_many_arguments)] fn new( @@ -82,6 +83,7 @@ impl PySessionConfig { min_barge_in_words: u32, barge_in_timeout_ms: u32, graph_json: Option, + escalation_destinations_json: Option, ) -> PyResult { // Parse v3 agent graph JSON (single AgentGraphDef) if provided let agent_graph: Option = match graph_json { @@ -93,6 +95,11 @@ impl PySessionConfig { None => None, }; + let escalation_destinations: Vec = match escalation_destinations_json { + Some(json_str) => serde_json::from_str(json_str).unwrap_or_default(), + None => vec![], + }; + Ok(Self { inner: { let mut cfg = SessionConfig { @@ -117,6 +124,7 @@ impl PySessionConfig { min_barge_in_words, barge_in_timeout_ms, agent_graph: agent_graph.clone(), + escalation_destinations, // Defaults — overridden from graph below ..Default::default() }; @@ -194,6 +202,7 @@ pub struct PyServerConfig { pub default_twilio_account_sid: String, pub default_twilio_auth_token: String, pub default_telnyx_api_key: String, + pub default_telnyx_connection_id: String, /// Shared secret for HMAC-SHA256 session token validation. /// When set, all endpoints require a valid token = HMAC(secret, session_id). /// When empty, token validation is skipped (dev mode). @@ -221,6 +230,7 @@ impl PyServerConfig { default_twilio_account_sid = String::new(), default_twilio_auth_token = String::new(), default_telnyx_api_key = String::new(), + default_telnyx_connection_id = String::new(), auth_secret_key = String::new(), ))] #[allow(clippy::too_many_arguments)] @@ -242,6 +252,7 @@ impl PyServerConfig { default_twilio_account_sid: String, default_twilio_auth_token: String, default_telnyx_api_key: String, + default_telnyx_connection_id: String, auth_secret_key: String, ) -> Self { Self { @@ -262,6 +273,7 @@ impl PyServerConfig { default_twilio_account_sid, default_twilio_auth_token, default_telnyx_api_key, + default_telnyx_connection_id, auth_secret_key, } } @@ -320,6 +332,7 @@ impl PyVoiceServer { twilio_account_sid: config.default_twilio_account_sid.clone(), twilio_auth_token: config.default_twilio_auth_token.clone(), telnyx_api_key: config.default_telnyx_api_key.clone(), + telnyx_connection_id: config.default_telnyx_connection_id.clone(), }; let state = ServerState::new(providers, telephony, config.auth_secret_key.clone()); @@ -746,6 +759,14 @@ impl PyAgentRunner { dict.set_item("reason", reason)?; dict.set_item("content", content)?; } + AgentEvent::EscalateCall { + destination, + reason, + } => { + dict.set_item("type", "escalate_call")?; + dict.set_item("destination", destination)?; + dict.set_item("reason", reason)?; + } AgentEvent::LlmComplete { provider, model, @@ -791,6 +812,7 @@ impl PyAgentRunner { max_tokens = 32768, greeting = None, secrets = None, + escalation_destinations_json = None, ))] #[allow(clippy::too_many_arguments)] fn new( @@ -806,6 +828,7 @@ impl PyAgentRunner { max_tokens: u32, greeting: Option, secrets: Option>, + escalation_destinations_json: Option<&str>, ) -> PyResult { let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() @@ -825,6 +848,12 @@ impl PyAgentRunner { None => None, }; + // Parse escalation destinations + let escalation_destinations: Vec = match escalation_destinations_json { + Some(json_str) => serde_json::from_str(json_str).unwrap_or_default(), + None => vec![], + }; + // Build LLM provider let provider = build_runner_llm_provider(llm_provider, llm_api_key, llm_url, llm_model)?; @@ -858,7 +887,9 @@ impl PyAgentRunner { py_hook, artifact: ArtifactInterceptor::new(ArtifactStore::new()), }; - backend = backend.with_interceptor(Arc::new(composite)); + backend = backend + .with_interceptor(Arc::new(composite)) + .with_telephony_escalation(true, escalation_destinations); // Set system prompt backend.set_system_prompt(system_prompt.to_string()); diff --git a/voice/engine/src/reactor/mod.rs b/voice/engine/src/reactor/mod.rs index 4fb1214..7f5f0b0 100644 --- a/voice/engine/src/reactor/mod.rs +++ b/voice/engine/src/reactor/mod.rs @@ -121,6 +121,14 @@ impl TransportControl { pub fn close(&self) { let _ = self.tx.send(TransportCommand::Close); } + + /// Forward the active telephony call to a new destination. + /// + /// Only meaningful for telephony transports (Twilio/Telnyx). For WebSocket/WebRTC, + /// this is a no-op. The transport cmd loop handles the actual REST call. + pub fn transfer(&self, destination: String) { + let _ = self.tx.send(TransportCommand::Transfer { destination }); + } } // ── Agent Audio Cursor ──────────────────────────────────────────── @@ -286,7 +294,10 @@ pub struct Reactor { // ── Session lifecycle ───────────────────────────────────── /// High-level session phase. Replaces `had_first_interaction` and /// `greeting_in_progress` with a compile-time-safe 3-variant enum. - pub(super) session_phase: SessionPhase, + pub(crate) session_phase: SessionPhase, + /// Call is holding waiting for transfer success/failure + pub(crate) holding: bool, + pub(crate) hold_phase: f32, // ── LLM generation phase (LLM → TTS streaming state) ──────────────── /// Tracks the current LLM output-to-TTS streaming state. @@ -326,6 +337,8 @@ pub struct Reactor { pub(super) should_exit: bool, // ── Transport control ───────────────────────────────────── + /// Receives events (like TransferResult) from the transport layer. + pub(super) control_rx: tokio::sync::mpsc::UnboundedReceiver, /// Sends commands (e.g. Close) back to the transport layer. /// Used to trigger telephony hangup via REST API. pub(super) transport: TransportControl, @@ -362,6 +375,11 @@ pub struct Reactor { pub(super) session_ended_emitted: bool, /// Temporary observability state for the opening greeting TTS. pub(super) greeting_observability: Option, + + // ── Call Escalation ─────────────────────────────────── + /// When set, `initiate_shutdown` will send `TransportCommand::Transfer` + /// with this destination before `Close`. Only meaningful for telephony. + pub(super) pending_transfer_destination: Option, } pub(super) struct GreetingObservability { @@ -379,6 +397,7 @@ impl Reactor { config: SessionConfig, backend: Box, audio_rx: mpsc::UnboundedReceiver, + control_rx: tokio::sync::mpsc::UnboundedReceiver, mut tracer: Tracer, stt_config: SttProviderConfig, tts_config: TtsProviderConfig, @@ -451,6 +470,8 @@ impl Reactor { audio_rx, tracer, session_phase: SessionPhase::Nascent, + holding: false, + hold_phase: 0.0, llm_turn: session::LlmTurnPhase::Idle, hang_up: session::HangUpPhase::Idle, tts_provider_config: tts_config, @@ -459,6 +480,7 @@ impl Reactor { bot_audio_sent: false, playback: PlaybackTracker::new(output_sample_rate), should_exit: false, + control_rx, transport, vad_silence_ms, turn_text_len: 0, @@ -468,6 +490,7 @@ impl Reactor { tts_cursor: AgentAudioCursor::new(output_sample_rate), session_ended_emitted: false, greeting_observability: None, + pending_transfer_destination: None, } } @@ -679,6 +702,52 @@ impl Reactor { // else: stale chunk (cancelled context) — drop silently } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(20)), if self.holding => { + let hold_rate = self.config.output_sample_rate; + let pcm_bytes = crate::utils::generate_transfer_hold_tone(&mut self.hold_phase, hold_rate); + + let offset = self.tts_cursor.stamp(pcm_bytes.len()); + + self.tracer.emit(voice_trace::Event::AgentAudio { + pcm: bytes::Bytes::from(pcm_bytes), + sample_rate: hold_rate, + offset_samples: offset, + }); + } + + ctrl_event = self.control_rx.recv() => { + match ctrl_event { + Some(voice_transport::TransportEvent::TransferResult { succeeded, destination, reason }) => { + info!("[reactor] Transfer complete: succeeded={}, destination={:?}, reason={:?}", succeeded, destination, reason); + if succeeded { + info!("[reactor] Call transferred successfully. Hanging up."); + self.should_exit = true; // exit normally + } else { + self.holding = false; + let fail_reason = reason.unwrap_or_else(|| "Unknown Provider Error".into()); + // Inject the failure back into the conversation so the LLM + // knows the transfer did not complete and can retry or apologise. + let failure_msg = format!( + "[SYSTEM] Call transfer to {} failed: {}. Please inform the caller and offer alternatives.", + destination, fail_reason + ); + self.llm.add_user_message(failure_msg); + self.cancel_pipeline(); + self.start_idle_timer(); + } + } + Some(voice_transport::TransportEvent::Disconnected { reason }) => { + info!("[reactor] Transport disconnected: {}", reason); + self.should_exit = true; + } + Some(_) => {} + None => { + info!("[reactor] Transport control channel closed. Terminating."); + self.should_exit = true; + } + } + } + // ── Timers (DelayQueue — one source for all timers) ── Some(expired) = self.timers.next() => { let timer_key: TimerKey = expired.into_inner(); diff --git a/voice/engine/src/reactor/proc/llm.rs b/voice/engine/src/reactor/proc/llm.rs index b7b82d1..46793fa 100644 --- a/voice/engine/src/reactor/proc/llm.rs +++ b/voice/engine/src/reactor/proc/llm.rs @@ -43,6 +43,13 @@ fn agent_event_to_llm(event: AgentEvent) -> LlmEvent { AgentEvent::Error(e) => LlmEvent::Error(e), AgentEvent::HangUp { reason, content } => LlmEvent::HangUp { reason, content }, AgentEvent::OnHold { duration_secs } => LlmEvent::OnHold { duration_secs }, + AgentEvent::EscalateCall { + destination, + reason, + } => LlmEvent::EscalateCall { + destination, + reason, + }, AgentEvent::LlmComplete { provider, model, diff --git a/voice/engine/src/reactor/session.rs b/voice/engine/src/reactor/session.rs index e4fa9b4..61946f7 100644 --- a/voice/engine/src/reactor/session.rs +++ b/voice/engine/src/reactor/session.rs @@ -758,7 +758,38 @@ impl Reactor { // any preceding text), close immediately if !self.tts.is_active() { info!("[reactor] hang_up: TTS not active — shutting down immediately"); - self.initiate_shutdown("no pending TTS"); + self.finalize_hang_up("no pending TTS"); + } + } + + LlmEvent::EscalateCall { + destination, + reason, + } => { + info!( + "[reactor] Agent escalate_call: {} → {} (tts_active={})", + reason, + destination, + self.tts.is_active() + ); + self.tracer.trace("EscalateCall"); + self.tracer.emit(Event::ToolActivity { + tool_call_id: None, + tool_name: "escalate_call".into(), + status: "executing".into(), + error_message: None, + }); + + // Store the destination; initiate_shutdown will send Transfer before Close. + self.pending_transfer_destination = Some(destination); + + // Reuse the hang-up state machine: drain TTS first, then shutdown. + self.tts.flush(); + self.hang_up = HangUpPhase::WaitingForTts; + + if !self.tts.is_active() { + info!("[reactor] escalate_call: TTS not active — transferring immediately"); + self.finalize_hang_up("escalate_call (no pending TTS)"); } } @@ -865,9 +896,7 @@ impl Reactor { if matches!(self.hang_up, HangUpPhase::WaitingForTts) { let delay = remaining; if delay.is_zero() { - self.initiate_shutdown( - "TTS finished after hang_up (no playback remaining)", - ); + self.finalize_hang_up("TTS finished after hang_up (no playback remaining)"); } else { info!( "[reactor] hang_up: delaying shutdown by {:?} for client playback", @@ -1044,12 +1073,13 @@ impl Reactor { info!("[reactor] Pipeline cancelled"); } - /// Initiate a clean session shutdown (agent-initiated hang-up). + /// Initiate a clean session shutdown (agent-initiated hang-up or escalation). /// /// 1. Emits `SessionEnded` for all subscribers (UI, OTel, etc.) /// 2. Cancels the active pipeline (LLM + TTS) - /// 3. Tells the transport layer to close (triggers telephony REST hangup) - /// 4. Sets `should_exit` to break the reactor loop on the next iteration + /// 3. If a call escalation is pending, sends `TransportCommand::Transfer` first. + /// 4. Tells the transport layer to close (triggers telephony REST hangup) + /// 5. Sets `should_exit` to break the reactor loop on the next iteration pub(super) fn initiate_shutdown(&mut self, reason: &str) { info!("[reactor] Initiating shutdown: {}", reason); self.notify_hooks(|p| p.on_session_end()); @@ -1062,6 +1092,20 @@ impl Reactor { self.should_exit = true; } + /// Actually tear down the session OR initiate a transfer if one is pending. + /// Called when the HangUpPhase has completed waiting for the client to drain. + pub(super) fn finalize_hang_up(&mut self, reason: &str) { + if let Some(dest) = self.pending_transfer_destination.take() { + info!("[reactor] Escalation: transferring call to {}", dest); + self.transport.transfer(dest); + // Do NOT shut down. The transport will report TransferResult via control_rx. + // If it succeeds, reactor terminates. If it fails, session resumes. + self.holding = true; + } else { + self.initiate_shutdown(reason); + } + } + // ── TTS turn management ─────────────────────────────────────── /// Start TTS for a new turn — reuses session-level WS connection when diff --git a/voice/engine/src/reactor/timers.rs b/voice/engine/src/reactor/timers.rs index f35699d..238268f 100644 --- a/voice/engine/src/reactor/timers.rs +++ b/voice/engine/src/reactor/timers.rs @@ -368,7 +368,7 @@ impl Reactor { self.hang_up = HangUpPhase::Idle; } info!("[reactor] HangUpDelay expired — client playback should be done"); - self.initiate_shutdown("hang_up playback delay expired"); + self.finalize_hang_up("hang_up playback delay expired"); } } } diff --git a/voice/engine/src/server.rs b/voice/engine/src/server.rs index de2e1eb..815e335 100644 --- a/voice/engine/src/server.rs +++ b/voice/engine/src/server.rs @@ -43,8 +43,10 @@ use voice_trace::{Event, Tracer}; use voice_transport::websocket::WebSocketTransport; use voice_transport::{TransportCommand, TransportHandle}; +#[cfg(any(feature = "webrtc", feature = "telephony"))] +use axum::routing::post; #[cfg(feature = "webrtc")] -use axum::{routing::post, Json}; +use axum::Json; #[cfg(feature = "webrtc")] use voice_transport::webrtc::{ ice_provider_from_config, IceConfig, IceProvider, WebRtcConnection, WebRtcTransport, @@ -52,6 +54,8 @@ use voice_transport::webrtc::{ #[cfg(feature = "webrtc")] use voice_transport::TransportEvent; +#[cfg(feature = "telephony")] +use voice_transport::telephony::providers::create_provider as create_telephony_provider; #[cfg(feature = "telephony")] use voice_transport::telephony::{ TelephonyConfig, TelephonyCredentials as TelephonyTransportCredentials, TelephonyEncoding, @@ -105,6 +109,7 @@ pub struct TelephonyCredentials { pub twilio_account_sid: String, pub twilio_auth_token: String, pub telnyx_api_key: String, + pub telnyx_connection_id: String, } // ── Registered Session ────────────────────────────────────────── @@ -149,6 +154,16 @@ pub struct RegisteredSession { /// to build `TelephonyConfig` instead of falling back to the global /// `ServerState.telephony_creds` (which is always empty in new deployments). pub telephony_creds: Option, + /// Webhook base URL for provider callbacks (e.g., transfers). + /// + /// Populated by voice-server during session registration from the database. + pub webhook_base_url: Option, + /// The E.164 phone number that received this call (e.g. `"+18005551234"`). + /// + /// Required for supervised conference-based transfer: the transport places + /// a new outbound call *from* this number to the destination. Populated by + /// voice-server from the Twilio/Telnyx webhook `To` parameter. + pub from_number: Option, } impl RegisteredSession { @@ -228,6 +243,24 @@ type HmacSha256 = Hmac; // ── Hybrid Session Entry ───────────────────────────────────────── +/// Bound state for a telephony supervised transfer in progress. +/// +/// Stored in `ServerState::transfer_waiters`, keyed by the original call SID. +/// When the destination call leg fires a StatusCallback webhook, this struct +/// provides everything needed to either: +/// - bridge the original caller into the Conference (on success), or +/// - report failure back to the AI session (on busy/no-answer/failed). +#[cfg(feature = "telephony")] +pub struct TransferWaiter { + /// Channel to deliver a `TransferResult` event to the active AI session. + pub event_tx: tokio::sync::mpsc::UnboundedSender, + /// Twilio/Telnyx credentials needed to call the bridge REST endpoint. + pub credentials: TelephonyTransportCredentials, + /// Name of the Conference room (e.g. `"feros-transfer-CA…"`). + /// Used in `bridge_call_to_conference` when the destination answers. + pub conference_name: String, +} + /// Bound state for a hybrid WebRTC+WS voice session. /// /// In hybrid mode the WebRTC connection carries audio and the UI WebSocket @@ -256,6 +289,15 @@ pub struct ServerState { /// Consumed by `forward_ws_events` on UI WebSocket close to send a /// clean shutdown signal to the reactor via `TransportCommand::Close`. pub(crate) hybrid_sessions: Arc>, + /// Active telephony transfer waiters (webhook bridging). + /// + /// Keyed by the **original call SID** (e.g. Twilio `CallSid`). + /// Each entry stores everything needed to: + /// (a) route a `TransferResult` back to the active AI session, and + /// (b) bridge the original caller into the Conference when the destination + /// answers (via `provider.bridge_call_to_conference`). + #[cfg(feature = "telephony")] + pub transfer_waiters: Arc>, /// Fallback provider URLs (used when session is configured inline). pub default_providers: ProviderConfig, /// Telephony credentials (Twilio/Telnyx). @@ -313,6 +355,8 @@ impl ServerState { Self { sessions: Arc::new(DashMap::new()), hybrid_sessions: Arc::new(DashMap::new()), + #[cfg(feature = "telephony")] + transfer_waiters: Arc::new(DashMap::new()), default_providers: providers, telephony_creds: telephony, auth_secret_key: Arc::new(auth_secret_key), @@ -404,7 +448,15 @@ pub fn build_router(state: ServerState) -> Router { { app = app .route("/telephony/twilio", get(telephony_twilio_handler)) - .route("/telephony/telnyx", get(telephony_telnyx_handler)); + .route( + "/telephony/twilio/transfer-status", + post(telephony_twilio_transfer_status_handler), + ) + .route("/telephony/telnyx", get(telephony_telnyx_handler)) + .route( + "/telephony/telnyx/transfer-status", + post(telephony_telnyx_transfer_status_handler), + ); info!("Telephony WS enabled at /telephony/twilio, /telephony/telnyx"); } @@ -435,6 +487,17 @@ pub async fn run_server(addr: SocketAddr, state: ServerState) { if removed > 0 { info!("Cleaned up {} expired session(s)", removed); } + // Also evict any transfer waiters whose session is gone. + // This handles the edge case where a session expires before + // the Twilio transfer-status callback ever fires. + #[cfg(feature = "telephony")] + state.transfer_waiters.retain(|id, tx| { + let alive = !tx.event_tx.is_closed(); + if !alive { + info!("Removing closed transfer waiter for session {}", id); + } + alive + }); } }); } @@ -1317,6 +1380,391 @@ async fn telephony_twilio_handler( .into_response() } +#[cfg(feature = "telephony")] +pub async fn telephony_twilio_transfer_status_handler( + State(state): State, + headers: axum::http::HeaderMap, + axum::extract::Host(host): axum::extract::Host, + axum::extract::OriginalUri(uri): axum::extract::OriginalUri, + axum::extract::Query(query): axum::extract::Query>, + axum::extract::Form(form): axum::extract::Form>, +) -> impl IntoResponse { + // Twilio's StatusCallback for the *outbound* transfer leg sends: + // CallSid = SID of the outbound transfer call leg + // CallStatus = initiated | ringing | in-progress | completed | busy | no-answer | failed + // ParentCallSid = original caller's CallSid (the one in transfer_waiters) + // + // We key our waiters on the *original* caller's SID (ParentCallSid), not the + // SID of the outbound transfer leg. + let outbound_call_sid = form.get("CallSid").cloned().unwrap_or_default(); + let call_status = form.get("CallStatus").cloned().unwrap_or_default(); + + // ParentCallSid is set by Twilio ONLY when the outbound call was placed by us via in TwiML. + // For REST API initiated calls, it is omitted, so we rely on the custom query parameter injected. + let mut original_call_sid = query.get("original_call_id").cloned().unwrap_or_default(); + if original_call_sid.is_empty() { + original_call_sid = form.get("ParentCallSid").cloned().unwrap_or_default(); + } + + if original_call_sid.is_empty() { + warn!("[telephony] No ParentCallSid or query parameter in transfer status callback — cannot route result"); + return StatusCode::OK.into_response(); + } + + // ── Twilio Webhook Security (Signature Validation) ── + let auth_token = if let Some(waiter) = state.transfer_waiters.get(&original_call_sid) { + match &waiter.credentials { + voice_transport::telephony::config::TelephonyCredentials::Twilio { auth_token, .. } => auth_token.clone(), + _ => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } + } else { + // Fallback to global config if waiter expired/missing + state.telephony_creds.twilio_auth_token.clone() + }; + + if !auth_token.is_empty() { + let signature = headers.get("x-twilio-signature").and_then(|v| v.to_str().ok()).unwrap_or_default(); + + let scheme = headers.get("x-forwarded-proto").and_then(|v| v.to_str().ok()).unwrap_or("https"); + let url = format!("{}://{}{}", scheme, host, uri); + + let mut sorted_keys: Vec<_> = form.keys().collect(); + sorted_keys.sort(); + let mut payload = url; + for k in sorted_keys { + payload.push_str(k); + payload.push_str(&form[k]); + } + + use hmac::{Hmac, Mac}; + use sha1::Sha1; + use base64::{Engine as _, engine::general_purpose::STANDARD as B64}; + + type HmacSha1 = Hmac; + if let Ok(mut mac) = HmacSha1::new_from_slice(auth_token.as_bytes()) { + mac.update(payload.as_bytes()); + let computed = B64.encode(mac.finalize().into_bytes()); + if computed != signature { + warn!("[telephony/twilio] Invalid Twilio webhook signature: expected {} got {}", computed, signature); + return StatusCode::UNAUTHORIZED.into_response(); + } + } + } + + info!( + "[telephony] Transfer status callback: original={} outbound={} status={}", + original_call_sid, outbound_call_sid, call_status + ); + + // Ignore transient states; only react to terminal / bridge events. + match call_status.as_str() { + "initiated" | "ringing" => { + // Still ringing — nothing to do yet. + info!( + "[telephony] Transfer outbound call {} is {}", + outbound_call_sid, call_status + ); + return StatusCode::OK.into_response(); + } + _ => {} + } + + let waiter_opt = state.transfer_waiters.get(&original_call_sid); + let Some(waiter) = waiter_opt else { + warn!( + "[telephony] Transfer status callback for unknown original call {}", + original_call_sid + ); + return StatusCode::OK.into_response(); + }; + + match call_status.as_str() { + "in-progress" => { + // Destination answered and is now in the Conference. + // Step: bridge the original caller into the same Conference room. + let provider = create_telephony_provider(&waiter.credentials); + + // Build a minimal TelephonyConfig with just credentials for the REST call. + let bridge_config = TelephonyConfig { + credentials: waiter.credentials.clone(), + ..TelephonyConfig::default() + }; + + info!( + "[telephony] Destination answered — bridging original call {} → conference {}", + original_call_sid, waiter.conference_name + ); + + match provider + .bridge_call_to_conference( + &bridge_config, + &original_call_sid, + &waiter.conference_name, + ) + .await + { + Ok(_) => { + info!( + "[telephony] Bridge succeeded for call {} → conference {}", + original_call_sid, waiter.conference_name + ); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: true, + destination: outbound_call_sid, + reason: None, + }; + if let Err(e) = waiter.event_tx.send(event) { + warn!( + "[telephony] Failed to send TransferResult to session: {}", + e + ); + } + } + Err(e) => { + warn!( + "[telephony] Conference bridge failed for call {}: {}", + original_call_sid, e + ); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid, + reason: Some(format!("Conference bridge failed: {}", e)), + }; + let _ = waiter.event_tx.send(event); + } + } + // Bridge attempt complete (success or failure) — remove the waiter. + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + "completed" => { + // The outbound leg completed (destination hung up after being in conference). + // This is a normal terminal state — remove the waiter silently. + info!( + "[telephony] Transfer outbound leg {} completed normally", + outbound_call_sid + ); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + "busy" => { + info!("[telephony] Transfer to {} — busy", outbound_call_sid); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid, + reason: Some("busy".into()), + }; + let _ = waiter.event_tx.send(event); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + "no-answer" => { + info!("[telephony] Transfer to {} — no answer", outbound_call_sid); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid, + reason: Some("no-answer".into()), + }; + let _ = waiter.event_tx.send(event); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + "failed" | _ => { + warn!( + "[telephony] Transfer to {} failed with status={}", + outbound_call_sid, call_status + ); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid, + reason: Some(call_status), + }; + let _ = waiter.event_tx.send(event); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + } + + StatusCode::OK.into_response() +} + +#[cfg(feature = "telephony")] +pub async fn telephony_telnyx_transfer_status_handler( + State(state): State, + axum::Json(json): axum::Json, +) -> impl IntoResponse { + let event_type = json + .pointer("/data/event_type") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + + use base64::Engine; + + // Extract the original caller SID from client_state + let client_state_b64 = json + .pointer("/data/payload/client_state") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + + let state_json: serde_json::Value = base64::engine::general_purpose::STANDARD + .decode(client_state_b64) + .ok() + .and_then(|decoded| serde_json::from_slice(&decoded).ok()) + .unwrap_or_default(); + + let original_call_sid = state_json + .get("original_call_id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + let outbound_call_sid = json + .pointer("/data/payload/call_control_id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + info!( + "[telephony:telnyx] Transfer webhook: original={} outbound={} event={}", + original_call_sid, outbound_call_sid, event_type + ); + + if original_call_sid.is_empty() { + return StatusCode::OK.into_response(); + } + + // Ignore noise. + match event_type { + "call.initiated" | "call.ringing" => { + return StatusCode::OK.into_response(); + } + _ => {} + } + + let waiter_opt = state.transfer_waiters.get(&original_call_sid); + let Some(waiter) = waiter_opt else { + warn!( + "[telephony:telnyx] Transfer status callback for unknown original call {}", + original_call_sid + ); + return StatusCode::OK.into_response(); + }; + + match event_type { + "call.answered" => { + let provider = create_telephony_provider(&waiter.credentials); + let bridge_config = TelephonyConfig { + credentials: waiter.credentials.clone(), + ..TelephonyConfig::default() + }; + + info!( + "[telephony:telnyx] Destination answered — bridging outbound {} and original {} → conference {}", + outbound_call_sid, original_call_sid, waiter.conference_name + ); + + let res_outbound = provider + .bridge_call_to_conference( + &bridge_config, + &outbound_call_sid, + &waiter.conference_name, + ) + .await; + + let conf_id_to_join = match res_outbound { + Ok(cid) => cid, + Err(e) => { + error!( + "[telephony:telnyx] Failed to bridge outbound call {}: {:?}", + outbound_call_sid, e + ); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid.clone(), + reason: Some("bridge-outbound-failed".into()), + }; + let _ = waiter.event_tx.send(event); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + return StatusCode::OK.into_response(); + } + }; + + let res_original = provider + .bridge_call_to_conference( + &bridge_config, + &original_call_sid, + &conf_id_to_join, + ) + .await; + + match res_original { + Ok(_) => { + info!( + "[telephony:telnyx] Bridge succeeded for both calls → conference {}", + waiter.conference_name + ); + let event = voice_transport::TransportEvent::TransferResult { + succeeded: true, + destination: outbound_call_sid, + reason: None, + }; + if let Err(e) = waiter.event_tx.send(event) { + warn!( + "[telephony:telnyx] Failed to send TransferResult to session: {}", + e + ); + } + } + Err(e) => { + error!( + "[telephony:telnyx] Failed to bridge original call: {:?}", + e + ); + // Hang up the outbound leg — it's in a conference nobody will join + if let Err(hangup_err) = provider.hangup(&bridge_config, &outbound_call_sid).await { + warn!("[telephony:telnyx] Failed to hang up outbound leg {} after bridge failure: {:?}", outbound_call_sid, hangup_err); + } + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid.clone(), + reason: Some("bridge-original-failed".into()), + }; + let _ = waiter.event_tx.send(event); + } + } + // Bridge attempt complete \u2014 remove the waiter regardless of outcome. + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + "call.hangup" => { + let hangup_cause = json + .pointer("/data/payload/sip_hangup_cause") + .and_then(|v| v.as_str()) + .unwrap_or("failed"); + + info!( + "[telephony:telnyx] Transfer to {} hung up. Cause: {}", + outbound_call_sid, hangup_cause + ); + + // For now, treat any hangup before call.answered as a failure. + // (If they hung up AFTER answering, the bridge would have happened already and session closed). + let event = voice_transport::TransportEvent::TransferResult { + succeeded: false, + destination: outbound_call_sid, + reason: Some(hangup_cause.into()), + }; + let _ = waiter.event_tx.send(event); + drop(waiter); + state.transfer_waiters.remove(&original_call_sid); + } + _ => {} + } + + StatusCode::OK.into_response() +} + #[cfg(feature = "telephony")] async fn telephony_telnyx_handler( ws: WebSocketUpgrade, @@ -1439,13 +1887,41 @@ async fn handle_telephony_session( } }) .unwrap_or_default(); - TelephonyTransportCredentials::Telnyx { api_key } + let connection_id = session_telephony_creds + .as_ref() + .map(|c| c.telnyx_connection_id.clone()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| state.telephony_creds.telnyx_connection_id.clone()); + TelephonyTransportCredentials::Telnyx { + api_key, + connection_id, + } } }; - // Build the telephony config from resolved credentials + // Resolve webhook base URL from registered session (populated by voice-server from DB). + // Resolve from_number (the called/To number) for conference-based supervised transfer. + // We peek the session here before the remove later — the remove happens after transport init + // so the peek is safe; both fields are Copy-friendly (Option). + let resolved_public_url = params + .session_id + .as_ref() + .and_then(|sid| state.sessions.get(sid)) + .and_then(|reg| reg.webhook_base_url.clone()) + .filter(|u| !u.is_empty()); + + let resolved_from_number = params + .session_id + .as_ref() + .and_then(|sid| state.sessions.get(sid)) + .and_then(|reg| reg.from_number.clone()) + .filter(|s| !s.is_empty()); + + // Build the telephony config from resolved credentials. let config = TelephonyConfig { credentials: resolved_credentials, + public_url: resolved_public_url, + from_number: resolved_from_number, stream_id: std::sync::Mutex::new(String::new()), // populated by recv loop from "start" event call_id: std::sync::Mutex::new(None), // populated by recv loop from "start" event inbound_encoding: TelephonyQueryParams::parse_encoding(¶ms.inbound_encoding), @@ -1468,28 +1944,49 @@ async fn handle_telephony_session( #[cfg(feature = "otel")] voice_trace::sinks::otel::spawn_otel_subscriber(&local_tracer); + // Capture credentials before config is moved into accept() — needed for TransferWaiter. + let config_credentials = config.credentials.clone(); + let (bus_tx_oneshot_tx, bus_tx_oneshot_rx) = tokio::sync::oneshot::channel(); let (transport, session_id_rx) = TelephonyTransport::accept(socket, config, bus_tx_oneshot_rx); - // Resolve session_id: prefer query param, fall back to start message's customParameters - let effective_session_id = if let Some(sid) = params.session_id.clone() { - sid + // Resolve session_id and call_id + let (effective_session_id, call_id) = if let Some(sid) = params.session_id.clone() { + // We still need to wait for call_id from the start message so we can register the transfer waiter + let call_id = + match tokio::time::timeout(std::time::Duration::from_secs(10), session_id_rx).await { + Ok(Ok((_, cid))) => cid, + _ => None, + }; + (sid, call_id) } else { - // Wait for the Twilio "start" message to get session_id from customParameters + // Wait for the provider "start" message match tokio::time::timeout(std::time::Duration::from_secs(10), session_id_rx).await { - Ok(Ok(Some(sid))) => { + Ok(Ok((Some(sid), cid))) => { info!("[telephony] Got session_id from customParameters: {}", sid); - sid + (sid, cid) } - _ => { + Ok(Ok((None, cid))) => { warn!("[telephony] No session_id from query params or customParameters, using inline session"); - uuid::Uuid::new_v4().to_string() + (uuid::Uuid::new_v4().to_string(), cid) + } + _ => { + warn!("[telephony] Timed out waiting for start message"); + (uuid::Uuid::new_v4().to_string(), None) } } }; // Look up pre-registered session or use defaults - let (session_id, mut session_config, providers, secrets, refresh_handle, reg_tracer) = { + let ( + session_id, + mut session_config, + providers, + secrets, + refresh_handle, + reg_tracer, + _reg_from_number, + ) = { if let Some((_, reg)) = state.sessions.remove(&effective_session_id) { // Extract the Tracer out of the Arc>> wrapper. let reg_tracer = reg @@ -1498,13 +1995,15 @@ async fn handle_telephony_session( .and_then(|arc| arc.lock().ok().and_then(|mut guard| guard.take())); let secrets = reg.secrets.clone(); let refresh_handle = reg.take_refresh_handle(); + let from_number = reg.from_number.clone(); ( - effective_session_id, + effective_session_id.clone(), reg.config, reg.providers, secrets, refresh_handle, reg_tracer, + from_number, ) } else { warn!( @@ -1512,16 +2011,33 @@ async fn handle_telephony_session( effective_session_id ); ( - effective_session_id, + effective_session_id.clone(), SessionConfig::default(), state.default_providers.clone(), None, // No secrets for unregistered telephony sessions None, // No refresh handle None, // No pre-created Tracer + None, // No from_number for unregistered sessions ) } }; + // Register a TransferWaiter in ServerState for inbound webhook routing. + // Done after session resolution so we have from_number and credentials available. + // Keyed by the original call SID (ParentCallSid in Twilio transfer callbacks). + if let Some(ref cid) = call_id { + // Conference name is derived deterministically from the call SID. + let conference_name = format!("feros-transfer-{}", cid); + let waiter = TransferWaiter { + event_tx: transport.event_tx.clone(), + credentials: config_credentials, + conference_name, + }; + state.transfer_waiters.insert(cid.clone(), waiter); + } else { + warn!("[telephony] No call_id available; transfer status callback routing will be disabled for this session."); + } + // Force 8 kHz for telephony session_config.input_sample_rate = 8000; session_config.output_sample_rate = 8000; @@ -1541,7 +2057,7 @@ async fn handle_telephony_session( let _ = bus_tx_oneshot_tx.send(session_tracer.subscribe_sender()); let summary = run_session_with_transport( - session_id, + session_id.clone(), session_config, transport, &providers, @@ -1550,6 +2066,12 @@ async fn handle_telephony_session( session_tracer, ) .await; + + // Clean up the webhook waiter + if let Some(ref cid) = call_id { + state.transfer_waiters.remove(cid); + } + info!( "[telephony] Session {} ended (agent={}, duration={}s)", summary.session_id, summary.agent_id, summary.duration_secs diff --git a/voice/engine/src/session.rs b/voice/engine/src/session.rs index 842acff..c7f15e2 100644 --- a/voice/engine/src/session.rs +++ b/voice/engine/src/session.rs @@ -5,7 +5,7 @@ use std::sync::Arc; -use agent_kit::swarm::AgentGraphDef; +use agent_kit::swarm::{AgentGraphDef, EscalationDestination}; use tracing::info; use voice_trace::Tracer; use voice_transport::TransportHandle; @@ -97,6 +97,10 @@ pub struct SessionConfig { /// When `Some`, the session bypasses STT/LLM/TTS entirely and uses /// Gemini Live's bidirectional audio WebSocket instead. pub native_multimodal: Option, + + // ── Telephony Handoff ─────────────────────────────────────── + /// SIP/Phone numbers the agent is allowed to escalate the call to. + pub escalation_destinations: Vec, } impl std::fmt::Debug for SessionConfig { @@ -150,6 +154,7 @@ impl Default for SessionConfig { tts_model: String::new(), recording: RecordingConfig::default(), native_multimodal: None, + escalation_destinations: vec![], } } } @@ -251,6 +256,7 @@ impl VoiceSession { recording_enabled, config.language.clone(), config.greeting.clone(), + config.escalation_destinations.clone(), ) .await; }); @@ -269,6 +275,7 @@ impl VoiceSession { } let audio_in_rx = transport.take_audio_rx(); + let control_rx = transport.take_control_rx(); let agent_graph = config.agent_graph.clone(); let task = AgentTaskSettings::get(); @@ -284,9 +291,14 @@ impl VoiceSession { let llm_provider: Arc = Arc::from(llm_provider); let backend: Box = Box::new( - DefaultAgentBackend::new(llm_provider, agent_graph, backend_config).with_interceptor( - std::sync::Arc::new(ArtifactInterceptor::new(ArtifactStore::new())), - ), + DefaultAgentBackend::new(llm_provider, agent_graph, backend_config) + .with_telephony_escalation( + transport.is_telephony, + config.escalation_destinations.clone(), + ) + .with_interceptor(std::sync::Arc::new(ArtifactInterceptor::new( + ArtifactStore::new(), + ))), ); let transport_control_tx = transport.control_tx.clone(); @@ -295,6 +307,7 @@ impl VoiceSession { config, backend, audio_in_rx, + control_rx, tracer, stt_config, tts_config, diff --git a/voice/engine/src/settings.rs b/voice/engine/src/settings.rs index a0958a5..10eb197 100644 --- a/voice/engine/src/settings.rs +++ b/voice/engine/src/settings.rs @@ -186,6 +186,10 @@ pub struct Settings { /// Telnyx API Key (for call control). #[serde(default)] pub telnyx_api_key: String, + + /// Telnyx Connection ID / App ID. + #[serde(default)] + pub telnyx_connection_id: String, } impl Settings { diff --git a/voice/engine/src/types.rs b/voice/engine/src/types.rs index ac099d5..743050f 100644 --- a/voice/engine/src/types.rs +++ b/voice/engine/src/types.rs @@ -6,7 +6,7 @@ //! - Tool execution types (`ToolCallRequest`, `ToolResult`) use bytes::Bytes; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use voice_trace::LlmCompletionData; // ── Session Types ──────────────────────────────────────────────── @@ -190,6 +190,16 @@ pub enum LlmEvent { reason: String, content: Option, }, + /// The agent has requested call escalation / human handoff. + /// + /// The reactor should drain TTS, then send `TransportCommand::Transfer { destination }` + /// to the telephony transport before closing the session. + EscalateCall { + /// E.164 phone number or SIP URI to forward the call to. + destination: String, + /// Human-readable reason (for telemetry/logs). + reason: String, + }, /// The user asked to hold / pause — suppress idle shutdown until they return. OnHold { duration_secs: u32 }, /// An LLM generation has completed (for observability). @@ -287,3 +297,14 @@ mod tests { assert_eq!(tc, TurnCompletion::IncompleteLong); } } +/// A pre-configured escalation destination (PSTN number or SIP URI). +/// +/// Agents inject these into the `escalate_call` tool schema so the LLM can +/// choose the appropriate destination by name. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EscalationDestination { + /// Human-readable label shown to the LLM (e.g. "Support Team", "Billing"). + pub name: String, + /// E.164 phone number (e.g. "+18005551234") or SIP URI. + pub phone_number: String, +} diff --git a/voice/engine/src/utils.rs b/voice/engine/src/utils.rs index b07d251..2f8f702 100644 --- a/voice/engine/src/utils.rs +++ b/voice/engine/src/utils.rs @@ -6,6 +6,8 @@ use std::time::{Duration, Instant}; pub const SAMPLE_RATE: u32 = 16_000; /// TTS output sample rate. pub const TTS_SAMPLE_RATE: u32 = 24_000; +/// WebRTC Opus clock rate. +pub const WEBRTC_RATE: u32 = 48_000; /// 16-bit PCM = 2 bytes per sample. pub const SAMPLE_WIDTH: usize = 2; /// Silero VAD requires chunks of at least 512 samples at 16kHz. @@ -13,6 +15,22 @@ pub const FRAME_SIZE: usize = 512; /// Bytes per frame (512 samples × 2 bytes). pub const FRAME_BYTES: usize = FRAME_SIZE * SAMPLE_WIDTH; +/// Generate 20ms of 440Hz transfer hold tone (mono PCM). +/// +/// Updates the provided `phase` to maintain wave continuity across frames. +pub fn generate_transfer_hold_tone(phase: &mut f32, sample_rate: u32) -> Vec { + let samples_per_frame = (sample_rate / 50) as usize; // 20ms + let hold_freq = 440.0_f32; // A4 tone + std::iter::from_fn(|| { + let sample = (*phase * std::f32::consts::TAU).sin() * 4000.0; + *phase = (*phase + hold_freq / sample_rate as f32).fract(); + Some((sample as i16).to_le_bytes()) + }) + .take(samples_per_frame) + .flatten() + .collect() +} + // ── WAV helpers ─────────────────────────────────────────────────────────────── /// Wrap raw PCM-16 LE mono 16 kHz bytes in a minimal 44-byte WAV container. diff --git a/voice/server/src/db.rs b/voice/server/src/db.rs index bea7050..39f2057 100644 --- a/voice/server/src/db.rs +++ b/voice/server/src/db.rs @@ -30,6 +30,8 @@ pub struct AgentConfig { pub agent_id: String, /// Parsed agent graph — None for legacy config format or if parsing fails. pub agent_graph: Option, + /// Parsed escalation destinations for telephony handoff. + pub escalation_destinations: Vec, // LLM (from provider_configs) pub llm_provider: String, pub llm_model: String, @@ -402,6 +404,7 @@ pub async fn phone_number_credentials( .unwrap_or("") .to_string(), telnyx_api_key: String::new(), + telnyx_connection_id: String::new(), }), "telnyx" => Some(voice_engine::server::TelephonyCredentials { twilio_account_sid: String::new(), @@ -411,6 +414,11 @@ pub async fn phone_number_credentials( .and_then(|v| v.as_str()) .unwrap_or("") .to_string(), + telnyx_connection_id: creds + .get("telnyx_connection_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(), }), other => { warn!("Unknown provider '{other}' in credentials for {phone_number}"); @@ -450,7 +458,7 @@ async fn agent_config_for_id( .unwrap_or(""); let agent_graph: Option = if schema == "v3_graph" { - match serde_json::from_value(config_json) { + match serde_json::from_value(config_json.clone()) { Ok(g) => Some(g), Err(e) => { warn!("Failed to parse AgentGraphDef for agent {agent_id}: {e}"); @@ -461,6 +469,11 @@ async fn agent_config_for_id( None }; + let escalation_destinations: Vec = config_json + .get("escalation_destinations") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + // Fetch voice provider settings in a single round-trip. // This is faster than tokio::join! (which requires 3 connections) and // sequential fetches (which requires 3 RTTs). A single query is also @@ -500,6 +513,7 @@ async fn agent_config_for_id( Some(AgentConfig { agent_id: agent_id.to_string(), agent_graph, + escalation_destinations, llm_provider: llm.provider.clone(), llm_model: llm.model.clone(), llm_base_url: llm.base_url.clone(), diff --git a/voice/server/src/sessions.rs b/voice/server/src/sessions.rs index 36fea9b..68614d0 100644 --- a/voice/server/src/sessions.rs +++ b/voice/server/src/sessions.rs @@ -74,8 +74,8 @@ async fn create_session( let token = state.engine.sign_token(&session_id); let vault_token = state.vault_token_for(agent_uuid); let public_url = db::voice_server_url(&state.pool).await; - // Browser/WebRTC sessions have no telephony credentials (no hangup via provider API needed) - register_session(&state, &session_id, &agent, vault_token, None, "webrtc").await; + // Browser/WebRTC sessions have no telephony credentials and no from_number + register_session(&state, &session_id, &agent, vault_token, None, "webrtc", None).await; // Build the WS URL — converts https:// → wss:// automatically let ws_url = format!( diff --git a/voice/server/src/telephony.rs b/voice/server/src/telephony.rs index 281a24f..07c9f8d 100644 --- a/voice/server/src/telephony.rs +++ b/voice/server/src/telephony.rs @@ -144,6 +144,7 @@ async fn twilio_incoming( vault_token, telephony_creds, "inbound", + Some(params.to.clone()), ) .await; @@ -323,6 +324,7 @@ async fn telnyx_inbound( vault_token, telephony_creds, "inbound", + params.to.clone(), ) .await; @@ -365,6 +367,7 @@ pub async fn register_session( vault_token: Option, telephony_creds: Option, direction: &str, + from_number: Option, ) { let providers = ProviderConfig { stt_url: agent.stt_base_url.clone(), @@ -385,6 +388,7 @@ pub async fn register_session( let mut config = SessionConfig { agent_id: agent.agent_id.clone(), agent_graph: agent.agent_graph.clone(), + escalation_destinations: agent.escalation_destinations.clone(), // Seed voice_id from provider_configs (TTS row) as the default. // The agent graph may override this below. voice_id: agent.tts_voice_id.clone(), @@ -661,6 +665,8 @@ pub async fn register_session( created_at: Instant::now(), tracer: Some(tracer_cell), telephony_creds, + webhook_base_url: Some(state.public_url.clone()), + from_number, }, ); }