Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,50 @@ curl http://localhost:8080/kill/s.session123
curl http://localhost:8080/iceservers
```

### 7. Playbook API
### 7. Stream Events

**Endpoint:** `GET /events/{id}`

**Description:** Opens a Server-Sent Events (SSE) stream for a specific active call, delivering real-time session events and commands as they occur.

**Path Parameters:**

| Parameter | Type | Description |
|-----------|--------|----------------------|
| `id` | string | Active call/track ID |

**Response:** `text/event-stream;charset=utf-8`

The stream emits two SSE event types:

| SSE Event | Data |
|-------------|---------------------------------------------------------|
| `event` | JSON-serialized `SessionEvent` (same as WebSocket events) |
| `command` | JSON-serialized command sent to the session |

The stream closes when the call ends (channel closed). Lagged messages are silently skipped.

**Errors:**

| Status | Description |
|--------|------------------------------------|
| 404 | No active call found for given `id` |

**Usage:**
```bash
curl -N http://localhost:8080/events/{id}
```

**Example output:**
```
event: event
data: {"event":"answer","trackId":"track-abc","timestamp":1700000000}

event: command
data: {"command":"tts","text":"Hello, how can I help you?"}
```

### 8. Playbook API

#### List Playbooks

Expand Down
45 changes: 44 additions & 1 deletion src/handler/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{event::SessionEvent, media::track::TrackConfig};
use axum::{
Json, Router,
extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
response::sse::{Event, KeepAlive, Sse},
response::{IntoResponse, Response},
routing::get,
};
Expand Down Expand Up @@ -39,7 +40,8 @@ pub fn call_router() -> Router<AppState> {
.route("/call/webrtc", get(webrtc_handler))
.route("/call/sip", get(sip_handler))
.route("/list", get(list_active_calls))
.route("/kill/{id}", get(kill_active_call));
.route("/kill/{id}", get(kill_active_call))
.route("/events/{id}", get(stream_events));
r
}

Expand Down Expand Up @@ -463,6 +465,47 @@ pub(crate) async fn kill_active_call(
}
}

pub(crate) async fn stream_events(
Path(id): Path<String>,
State(state): State<AppState>,
) -> Response {
let mut rx_events;
let mut rx_commands;
{
let active_calls = state.active_calls.lock().unwrap();
if let Some(call) = active_calls.get(&id) {
rx_events = call.event_sender.subscribe();
rx_commands = call.cmd_sender.subscribe();
} else {
return (axum::http::StatusCode::NOT_FOUND, "track not active").into_response();
}
}

let stream = async_stream::stream! {
loop {
let result = tokio::select! {
r = rx_events.recv() => r.map(|e| serde_json::to_string(&e).map(|json| Event::default().event("event").data(json))),
r = rx_commands.recv() => r.map(|c| serde_json::to_string(&c).map(|json| Event::default().event("command").data(json))),
};
match result {
Ok(Ok(sse_event)) => yield Ok::<Event, serde_json::Error>(sse_event),
Ok(Err(e)) => yield Err(e.into()),
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
};

let mut response = Sse::new(stream)
.keep_alive(KeepAlive::default())
.into_response();
response.headers_mut().insert(
axum::http::header::CONTENT_TYPE,
"text/event-stream;charset=utf-8".parse().unwrap(),
);
response
}

trait IntoWsMessage {
fn into_ws_message(self) -> Result<Message, serde_json::Error>;
}
Expand Down
Loading