-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Summary
- Context: The SSE (Server-Sent Events) module converts
ReadEventenum variants toaxum::response::sse::Eventfor streaming responses to clients. - Bug: The
Doneevent is converted to an SSE Event without setting an event type, causing it to default to"message"instead of"done". - Actual vs. expected: The
Doneevent is sent with noevent:field (defaulting to"message"), but it should be sent withevent: donelike all other events set their types. - Impact: SSE clients using
addEventListener('done', ...)will not receive the Done event, breaking stream termination detection for properly-structured client code.
Code with bug
The Done event conversion at api/src/v1/stream/sse.rs:
impl TryFrom<ReadEvent> for axum::response::sse::Event {
type Error = axum::Error;
fn try_from(event: ReadEvent) -> Result<Self, Self::Error> {
match event {
ReadEvent::Batch { event, data, id } => Self::default()
.event(event)
.id(id.to_string())
.json_data(data),
ReadEvent::Error { event, data } => Ok(Self::default().event(event).data(data)),
ReadEvent::Ping { event, data } => Self::default().event(event).json_data(data),
ReadEvent::Done { data } => Ok(Self::default().data(data)), // <-- BUG 🔴 Missing .event(...) call
}
}
}The Done variant is also missing an event field:
pub enum ReadEvent {
Batch {
event: Batch, // Has event field
data: ReadBatch,
id: LastEventId,
},
Error {
event: Error, // Has event field
data: String,
},
Ping {
event: Ping, // Has event field
data: PingEventData,
},
Done {
// <-- BUG 🔴 Missing event field
data: DoneEventData,
},
}And there's no Done event type enum created:
event!(Batch, "batch");
event!(Error, "error");
event!(Ping, "ping");
// <-- BUG 🔴 Missing: event!(Done, "done");Evidence
Example
Consider a typical SSE client implementation:
const eventSource = new EventSource('/api/stream');
eventSource.addEventListener('batch', (e) => {
console.log('Received batch:', e.data);
});
eventSource.addEventListener('done', (e) => {
console.log('Stream complete');
eventSource.close();
});Expected behavior:
When the server sends a ReadEvent::Done, the client receives:
event: done
data: [DONE]
The client's done event listener fires, logs "Stream complete", and closes the connection.
Actual behavior:
When the server sends a ReadEvent::Done, the client receives:
data: [DONE]
Without an event: field, this is treated as a message event (the default). The client's done event listener never fires. The stream stays open indefinitely unless the client manually checks message event data for [DONE].
Inconsistency within the codebase
Reference code: Batch, Error, and Ping events
api/src/v1/stream/sse.rs lines 167-172:
ReadEvent::Batch { event, data, id } => Self::default()
.event(event) // ✓ Sets event type
.id(id.to_string())
.json_data(data),
ReadEvent::Error { event, data } => Ok(Self::default().event(event).data(data)), // ✓ Sets event type
ReadEvent::Ping { event, data } => Self::default().event(event).json_data(data), // ✓ Sets event typeAll three events call .event(event) to set the SSE event type.
Current code: Done event
api/src/v1/stream/sse.rs line 173:
ReadEvent::Done { data } => Ok(Self::default().data(data)),Contradiction
The Done event conversion is missing the .event(...) call that all other events include. This is inconsistent with the established pattern in the codebase. Each of the other three events:
- Has an event type enum created via the
event!macro (lines 87-89) - Has an
eventfield in itsReadEventvariant - Calls
.event(event)when converting toaxum::response::sse::Event
The Done event has none of these, breaking the pattern without explanation.
Inconsistency with own spec
Reference spec
api/src/v1/stream/sse.rs line 115:
#[cfg_attr(feature = "utoipa", schema(title = "done"))]
Done {
#[cfg_attr(feature = "utoipa", schema(value_type = String, pattern = r"^\[DONE\]$"))]
data: DoneEventData,
},Contradiction
The schema(title = "done") annotation indicates that in the API documentation (OpenAPI/utoipa), this event should be identified as "done". The title field in OpenAPI schemas is used to provide a human-readable name for the schema. By convention in this codebase, event types match their schema titles (batch/batch, error/error, ping/ping), suggesting the Done event type should also be "done".
However, the implementation doesn't create a Done event type enum or set the event type when converting to an SSE Event, creating a discrepancy between the documented schema and actual behavior.
Explanation
The SSE specification states that when no event: field is present in an SSE message, browsers treat it as event type "message". This is the default behavior. To send a custom event type, the event: field must be explicitly included.
In the current implementation:
ReadEvent::Donehas noeventfield in its struct- The conversion (line 173) only calls
.data(data), not.event(...) - Result: The SSE message has no
event:field - Consequence: Browsers treat it as type
"message", not"done"
This breaks clients that use addEventListener('done', ...) to listen specifically for the done event, which is the idiomatic SSE pattern for custom event types.
Full context
The ReadEvent enum in api/src/v1/stream/sse.rs represents events sent to clients via Server-Sent Events (SSE) when streaming data from S2 streams. The enum has four variants:
- Batch: Sent when delivering a batch of records to the client. Contains the records, tail information, and a
LastEventIdfor resumption. - Error: Sent when an error occurs during streaming.
- Ping: Sent periodically to keep the connection alive and provide a timestamp.
- Done: Sent to signal the end of the stream.
These events are converted to axum::response::sse::Event via a TryFrom implementation (lines 162-176). The axum SSE Event is then sent over the HTTP connection to the client as formatted SSE messages.
SSE is used when the client requests streaming by setting the Accept: text/event-stream header (see api/src/v1/stream/mod.rs lines 236-241). The Done event is created via the ReadEvent::done() constructor (line 154) and is likely sent when:
- A bounded read completes (all requested records delivered)
- An
untiltimestamp is reached - The stream ends naturally
Clients rely on receiving the correct event types to handle different events appropriately. Standard SSE client implementations use addEventListener(eventType, handler) to route events to specific handlers. The browser's SSE implementation automatically filters events by their event: field, so a done event listener will never receive messages without event: done.
Why has this bug gone undetected?
This bug has gone undetected because:
-
Graceful degradation: Clients can still detect stream completion by checking if the data is
"[DONE]"in a genericmessageevent listener. Many implementations likely use this workaround:eventSource.addEventListener('message', (e) => { if (e.data === '[DONE]') { // Stream is done eventSource.close(); } });
-
Limited SSE usage: The
#[serde(skip)]attribute on theDonevariant (line 116) suggests this event is only used in SSE contexts, not in regular JSON API responses. If most clients use the unary API or S2S protocol instead of SSE, the bug would rarely be encountered. -
Testing gaps: There may be no automated tests that verify SSE event types. The test suite might only check data content, not event type metadata.
-
Documentation ambiguity: If client documentation instructs developers to listen for
messageevents or check data for[DONE], rather than listening fordoneevents specifically, clients would never know to expect adoneevent type. -
Internal consistency: Since all events in the initial commit (8a33e9b) had this same pattern, there's been no "before and after" to compare. The issue was baked in from the start.
Recommended fix
Add the missing Done event type enum, field, and conversion call:
// Line 89: Add this line after the other event! invocations
event!(Done, "done"); // <-- FIX 🟢
// Lines 115-121: Add event field to Done variant
#[cfg_attr(feature = "utoipa", schema(title = "done"))]
Done {
#[cfg_attr(feature = "utoipa", schema(inline))]
event: Done, // <-- FIX 🟢 Add this field
#[cfg_attr(feature = "utoipa", schema(value_type = String, pattern = r"^\[DONE\]$"))]
data: DoneEventData,
},
// Line 154: Update done() constructor to include event field
pub fn done() -> Self {
Self::Done {
event: Done::Done, // <-- FIX 🟢 Add this field
data: DoneEventData,
}
}
// Line 173: Add .event() call in conversion
ReadEvent::Done { event, data } => Ok(Self::default().event(event).data(data)), // <-- FIX 🟢This makes the Done event consistent with Batch, Error, and Ping.