From 8a2fba3fd93f07ad0bd5abd7a07e056a4bd515cf Mon Sep 17 00:00:00 2001 From: Object905 Date: Wed, 1 Apr 2026 12:53:05 +0300 Subject: [PATCH 1/3] feat: add send command endpoint --- docs/api.md | 34 +++++++++++++++++++++++++++++++++- src/handler/handler.rs | 30 +++++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/docs/api.md b/docs/api.md index ef18cda..4300272 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1436,7 +1436,39 @@ If the session is not found: curl http://localhost:8080/kill/s.session123 ``` -### 6. Get ICE Servers +### 6. Send Command + +**Endpoint:** `POST /command/{id}` + +**Description:** Sends a command to a specific active call by its session ID. Accepts the same command objects as the WebSocket command interface. + +**Parameters:** +- `id` (path parameter, string): The session ID of the target call. + +**Request Body:** A command object (see [WebSocket Commands](#websocket-commands) for the full list). + +```json +{ "command": "tts", "text": "Hello, how can I help you?" } +``` + +**Response:** +```json +{ "status": "sent", "id": "s.session123" } +``` + +If the session is not found: +```json +{ "status": "not_found", "id": "s.session123" } +``` + +**Usage:** +```bash +curl -X POST http://localhost:8080/command/s.session123 \ + -H "Content-Type: application/json" \ + -d '{"command": "hangup", "reason": "normal", "initiator": "server"}' +``` + +### 7. Get ICE Servers **Endpoint:** `GET /iceservers` diff --git a/src/handler/handler.rs b/src/handler/handler.rs index 0083119..484cf87 100644 --- a/src/handler/handler.rs +++ b/src/handler/handler.rs @@ -12,7 +12,7 @@ use axum::{ Json, Router, extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, response::{IntoResponse, Response}, - routing::get, + routing::{get, post}, }; use bytes::Bytes; use chrono::Utc; @@ -39,7 +39,8 @@ pub fn call_router() -> Router { .route("/call/webrtc", get(webrtc_handler)) .route("/call/sip", get(sip_handler)) .route("/list", get(list_active_calls)) - .route("/kill/{id}", get(kill_active_call)); + .route("/kill/{id}", get(kill_active_call)) + .route("/command/{id}", post(send_command)); r } @@ -459,10 +460,33 @@ pub(crate) async fn kill_active_call( call.cancel_token.cancel(); Json(serde_json::json!({ "status": "killed", "id": id })).into_response() } else { - Json(serde_json::json!({ "status": "not_found", "id": id })).into_response() + ( + axum::http::StatusCode::NOT_FOUND, + Json(serde_json::json!({ "status": "not_found", "id": id })), + ) + .into_response() } } +pub(crate) async fn send_command( + Path(id): Path, + State(state): State, + Json(command): Json, +) -> Response { + let active_calls = state.active_calls.lock().unwrap(); + if let Some(call) = active_calls.get(&id) { + if let Ok(_) = call.cmd_sender.send(command) { + return Json(serde_json::json!({ "status": "sent", "id": id })).into_response(); + } + } + + ( + axum::http::StatusCode::NOT_FOUND, + Json(serde_json::json!({ "status": "not_found", "id": id })), + ) + .into_response() +} + trait IntoWsMessage { fn into_ws_message(self) -> Result; } From ac8ef61272a0a002af86e3c9ddc9f76a36f8c5e0 Mon Sep 17 00:00:00 2001 From: Object905 Date: Wed, 1 Apr 2026 12:58:05 +0300 Subject: [PATCH 2/3] fix: doc numbering --- docs/api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api.md b/docs/api.md index 4300272..52821ab 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1497,7 +1497,7 @@ curl -X POST http://localhost:8080/command/s.session123 \ curl http://localhost:8080/iceservers ``` -### 7. Playbook API +### 8. Playbook API #### List Playbooks From 2ebafa0b6494bd5ae4634c4afbee0f1f1a950b4e Mon Sep 17 00:00:00 2001 From: Zverev Konstantin Date: Thu, 2 Apr 2026 04:50:21 +0300 Subject: [PATCH 3/3] feat: add sse events/commands listener (#82) --- docs/api.md | 45 +++++++++++++++++++++++++++++++++++++++++- src/handler/handler.rs | 45 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/docs/api.md b/docs/api.md index ef18cda..d68ee13 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1465,7 +1465,50 @@ curl http://localhost:8080/kill/s.session123 curl http://localhost:8080/iceservers ``` -### 7. Playbook API +### 7. Stream Events + +**Endpoint:** `GET /events/{id}` + +**Description:** Opens a Server-Sent Events (SSE) stream for a specific active call, delivering real-time session events and commands as they occur. + +**Path Parameters:** + +| Parameter | Type | Description | +|-----------|--------|----------------------| +| `id` | string | Active call/track ID | + +**Response:** `text/event-stream;charset=utf-8` + +The stream emits two SSE event types: + +| SSE Event | Data | +|-------------|---------------------------------------------------------| +| `event` | JSON-serialized `SessionEvent` (same as WebSocket events) | +| `command` | JSON-serialized command sent to the session | + +The stream closes when the call ends (channel closed). Lagged messages are silently skipped. + +**Errors:** + +| Status | Description | +|--------|------------------------------------| +| 404 | No active call found for given `id` | + +**Usage:** +```bash +curl -N http://localhost:8080/events/{id} +``` + +**Example output:** +``` +event: event +data: {"event":"answer","trackId":"track-abc","timestamp":1700000000} + +event: command +data: {"command":"tts","text":"Hello, how can I help you?"} +``` + +### 8. Playbook API #### List Playbooks diff --git a/src/handler/handler.rs b/src/handler/handler.rs index 0083119..0d57d28 100644 --- a/src/handler/handler.rs +++ b/src/handler/handler.rs @@ -11,6 +11,7 @@ use crate::{event::SessionEvent, media::track::TrackConfig}; use axum::{ Json, Router, extract::{Path, Query, State, WebSocketUpgrade, ws::Message}, + response::sse::{Event, KeepAlive, Sse}, response::{IntoResponse, Response}, routing::get, }; @@ -39,7 +40,8 @@ pub fn call_router() -> Router { .route("/call/webrtc", get(webrtc_handler)) .route("/call/sip", get(sip_handler)) .route("/list", get(list_active_calls)) - .route("/kill/{id}", get(kill_active_call)); + .route("/kill/{id}", get(kill_active_call)) + .route("/events/{id}", get(stream_events)); r } @@ -463,6 +465,47 @@ pub(crate) async fn kill_active_call( } } +pub(crate) async fn stream_events( + Path(id): Path, + State(state): State, +) -> Response { + let mut rx_events; + let mut rx_commands; + { + let active_calls = state.active_calls.lock().unwrap(); + if let Some(call) = active_calls.get(&id) { + rx_events = call.event_sender.subscribe(); + rx_commands = call.cmd_sender.subscribe(); + } else { + return (axum::http::StatusCode::NOT_FOUND, "track not active").into_response(); + } + } + + let stream = async_stream::stream! { + loop { + let result = tokio::select! { + r = rx_events.recv() => r.map(|e| serde_json::to_string(&e).map(|json| Event::default().event("event").data(json))), + r = rx_commands.recv() => r.map(|c| serde_json::to_string(&c).map(|json| Event::default().event("command").data(json))), + }; + match result { + Ok(Ok(sse_event)) => yield Ok::(sse_event), + Ok(Err(e)) => yield Err(e.into()), + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }; + + let mut response = Sse::new(stream) + .keep_alive(KeepAlive::default()) + .into_response(); + response.headers_mut().insert( + axum::http::header::CONTENT_TYPE, + "text/event-stream;charset=utf-8".parse().unwrap(), + ); + response +} + trait IntoWsMessage { fn into_ws_message(self) -> Result; }