diff --git a/docs/api.md b/docs/api.md index ef18cda..d68ee13 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1465,7 +1465,50 @@ curl http://localhost:8080/kill/s.session123 curl http://localhost:8080/iceservers ``` -### 7. Playbook API +### 7. Stream Events + +**Endpoint:** `GET /events/{id}` + +**Description:** Opens a Server-Sent Events (SSE) stream for a specific active call, delivering real-time session events and commands as they occur. + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|----------------------| +| `id` | string | Active call/track ID | + +**Response:** `text/event-stream;charset=utf-8` + +The stream emits two SSE event types: + +| SSE Event | Data | +|-------------|---------------------------------------------------------| +| `event` | JSON-serialized `SessionEvent` (same as WebSocket events) | +| `command` | JSON-serialized command sent to the session | + +The stream closes when the call ends (channel closed). Lagged messages are silently skipped. + +**Errors:** + +| Status | Description | +|--------|------------------------------------| +| 404 | No active call found for given `id` | + +**Usage:** +```bash +curl -N http://localhost:8080/events/{id} +``` + +**Example output:** +``` +event: event +data: {"event":"answer","trackId":"track-abc","timestamp":1700000000} + +event: command +data: {"command":"tts","text":"Hello, how can I help you?"} +``` + +### 8. Playbook API #### List Playbooks diff --git a/src/handler/handler.rs b/src/handler/handler.rs index 0083119..0d57d28 100644 --- a/src/handler/handler.rs +++ b/src/handler/handler.rs @@ -11,6 +11,7 @@ use crate::{event::SessionEvent, media::track::TrackConfig}; use axum::{ Json, Router, extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, + response::sse::{Event, KeepAlive, Sse}, response::{IntoResponse, Response}, routing::get, }; @@ -39,7 +40,8 @@ pub fn call_router() -> Router { .route("/call/webrtc", get(webrtc_handler)) .route("/call/sip", get(sip_handler)) .route("/list", get(list_active_calls)) - .route("/kill/{id}", get(kill_active_call)); + .route("/kill/{id}", get(kill_active_call)) + .route("/events/{id}", get(stream_events)); r } @@ -463,6 +465,47 @@ pub(crate) async fn kill_active_call( } } +pub(crate) async fn stream_events( + Path(id): Path, + State(state): State, +) -> Response { + let mut rx_events; + let mut rx_commands; + { + let active_calls = state.active_calls.lock().unwrap(); + if let Some(call) = active_calls.get(&id) { + rx_events = call.event_sender.subscribe(); + rx_commands = call.cmd_sender.subscribe(); + } else { + return (axum::http::StatusCode::NOT_FOUND, "track not active").into_response(); + } + } + + let stream = async_stream::stream! { + loop { + let result = tokio::select! { + r = rx_events.recv() => r.map(|e| serde_json::to_string(&e).map(|json| Event::default().event("event").data(json))), + r = rx_commands.recv() => r.map(|c| serde_json::to_string(&c).map(|json| Event::default().event("command").data(json))), + }; + match result { + Ok(Ok(sse_event)) => yield Ok::(sse_event), + Ok(Err(e)) => yield Err(e.into()), + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }; + + let mut response = Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response(); + response.headers_mut().insert( + axum::http::header::CONTENT_TYPE, + "text/event-stream;charset=utf-8".parse().unwrap(), + ); + response +} + trait IntoWsMessage { fn into_ws_message(self) -> Result; }