diff --git a/docs/api.md b/docs/api.md index ef18cda..78379b2 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1436,7 +1436,39 @@ If the session is not found: curl http://localhost:8080/kill/s.session123 ``` -### 6. Get ICE Servers +### 6. Send Command + +**Endpoint:** `POST /command/{id}` + +**Description:** Sends a command to a specific active call by its session ID. Accepts the same command objects as the WebSocket command interface. + +**Parameters:** +- `id` (path parameter, string): The session ID of the target call. + +**Request Body:** A command object (see [WebSocket Commands](#websocket-commands) for the full list). + +```json +{ "command": "tts", "text": "Hello, how can I help you?" } +``` + +**Response:** +```json +{ "status": "sent", "id": "s.session123" } +``` + +If the session is not found: +```json +{ "status": "not_found", "id": "s.session123" } +``` + +**Usage:** +```bash +curl -X POST http://localhost:8080/command/s.session123 \ + -H "Content-Type: application/json" \ + -d '{"command": "hangup", "reason": "normal", "initiator": "server"}' +``` + +### 7. Get ICE Servers **Endpoint:** `GET /iceservers` @@ -1465,7 +1497,50 @@ curl http://localhost:8080/kill/s.session123 curl http://localhost:8080/iceservers ``` -### 7. Playbook API +### 8. Stream Events + +**Endpoint:** `GET /events/{id}` + +**Description:** Opens a Server-Sent Events (SSE) stream for a specific active call, delivering real-time session events and commands as they occur. + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|----------------------| +| `id` | string | Active call/track ID | + +**Response:** `text/event-stream;charset=utf-8` + +The stream emits two SSE event types: + +| SSE Event | Data | +|-------------|---------------------------------------------------------| +| `event` | JSON-serialized `SessionEvent` (same as WebSocket events) | +| `command` | JSON-serialized command sent to the session | + +The stream closes when the call ends (channel closed). Lagged messages are silently skipped. + +**Errors:** + +| Status | Description | +|--------|------------------------------------| +| 404 | No active call found for given `id` | + +**Usage:** +```bash +curl -N http://localhost:8080/events/{id} +``` + +**Example output:** +``` +event: event +data: {"event":"answer","trackId":"track-abc","timestamp":1700000000} + +event: command +data: {"command":"tts","text":"Hello, how can I help you?"} +``` + +### 9. Playbook API #### List Playbooks diff --git a/src/handler/handler.rs b/src/handler/handler.rs index 0083119..c41cbe5 100644 --- a/src/handler/handler.rs +++ b/src/handler/handler.rs @@ -11,8 +11,9 @@ use crate::{event::SessionEvent, media::track::TrackConfig}; use axum::{ Json, Router, extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, + response::sse::{Event, KeepAlive, Sse}, response::{IntoResponse, Response}, - routing::get, + routing::{get, post}, }; use bytes::Bytes; use chrono::Utc; @@ -39,7 +40,9 @@ pub fn call_router() -> Router { .route("/call/webrtc", get(webrtc_handler)) .route("/call/sip", get(sip_handler)) .route("/list", get(list_active_calls)) - .route("/kill/{id}", get(kill_active_call)); + .route("/kill/{id}", get(kill_active_call)) + .route("/events/{id}", get(stream_events)) + .route("/command/{id}", post(send_command)); r } @@ -459,8 +462,72 @@ pub(crate) async fn kill_active_call( call.cancel_token.cancel(); Json(serde_json::json!({ "status": "killed", "id": id })).into_response() } else { - Json(serde_json::json!({ "status": "not_found", "id": id })).into_response() + ( + axum::http::StatusCode::NOT_FOUND, + Json(serde_json::json!({ "status": "not_found", "id": id })), + ) + .into_response() + } +} + +pub(crate) async fn stream_events( + Path(id): Path, + State(state): State, +) -> Response { + let mut rx_events; + let mut rx_commands; + { + let active_calls = state.active_calls.lock().unwrap(); + if let Some(call) = active_calls.get(&id) { + rx_events = call.event_sender.subscribe(); + rx_commands = call.cmd_sender.subscribe(); + } else { + return (axum::http::StatusCode::NOT_FOUND, "track not active").into_response(); + } + } + + let stream = async_stream::stream! { + loop { + let result = tokio::select! { + r = rx_events.recv() => r.map(|e| serde_json::to_string(&e).map(|json| Event::default().event("event").data(json))), + r = rx_commands.recv() => r.map(|c| serde_json::to_string(&c).map(|json| Event::default().event("command").data(json))), + }; + match result { + Ok(Ok(sse_event)) => yield Ok::(sse_event), + Ok(Err(e)) => yield Err(e.into()), + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }; + + let mut response = Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response(); + response.headers_mut().insert( + axum::http::header::CONTENT_TYPE, + "text/event-stream;charset=utf-8".parse().unwrap(), + ); + response +} + +pub(crate) async fn send_command( + Path(id): Path, + State(state): State, + Json(command): Json, +) -> Response { + let active_calls = state.active_calls.lock().unwrap(); + if let Some(call) = active_calls.get(&id) { + if let Ok(_) = call.cmd_sender.send(command) { + return Json(serde_json::json!({ "status": "sent", "id": id })).into_response(); + } } + + ( + axum::http::StatusCode::NOT_FOUND, + Json(serde_json::json!({ "status": "not_found", "id": id })), + ) + .into_response() } trait IntoWsMessage {