Skip to content

Streaming handlers: imperative writer API for bidi and server-streaming #86

@iainmcgin

Description

@iainmcgin

Server-streaming and bidirectional handlers must construct an impl Stream up front and return it. For finite, precomputable streams that's fine — Response::stream_ok(stream::iter(items)) reads well. But for handlers that interleave reads and writes, or that produce items as a side effect of looping over the request stream, the only options today are futures::stream::unfold with an explicit state tuple, or hand-rolling a tokio::sync::mpsc channel + ReceiverStream and spawning a task to drive it.

The eliza bidi handler is the canonical example of the friction:

async fn converse(
    &self,
    _ctx: RequestContext,
    requests: ServiceStream<OwnedView<ConverseRequestView<'static>>>,
) -> ServiceResult<ServiceStream<ConverseResponse>> {
    let response_stream = futures::stream::unfold(
        (requests, false),
        |(mut requests, session_ended)| async move {
            if session_ended {
                return None;
            }
            match requests.next().await {
                None => None,
                Some(Err(e)) => Some((Err(e), (requests, true))),
                Some(Ok(req)) => {
                    let (reply, end_session) = eliza::reply(req.sentence);
                    Some((
                        Ok(ConverseResponse { sentence: reply, ..Default::default() }),
                        (requests, end_session),
                    ))
                }
            }
        },
    );
    Response::stream_ok(response_stream)
}

unfold is one of the harder futures combinators to read and write — it inverts control flow into a (state) -> Future<Option<(item, state)>> shape. Compare to the same handler in connect-go, where the runtime hands the handler both halves of the stream as plain method calls:

func (e *elizaServer) Converse(ctx context.Context, stream *connect.BidiStream[..., ...]) error {
    for {
        req, err := stream.Receive()
        if errors.Is(err, io.EOF) { return nil }
        if err != nil { return err }
        reply, end := eliza.Reply(req.GetSentence())
        if err := stream.Send(&elizav1.ConverseResponse{Sentence: reply}); err != nil { return err }
        if end { return nil }
    }
}

gRPC-Go (stream.Send on a generated stream type) and smithy-rs (EventStreamSender) take the same imperative-writer shape. tonic forces a type FooStream associated type plus a channel-and-spawn pattern, which is widely cited as one of its biggest ergonomics complaints — we should not converge toward that.

Proposed direction

Add a ServerStreamWriter<T> handle and an alternative handler signature where the runtime owns the response channel:

async fn converse(
    &self,
    ctx: RequestContext,
    mut requests: ServiceStream<OwnedConverseRequestView>,
    responses: ServerStreamWriter<ConverseResponse>,
) -> Result<(), ConnectError> {
    while let Some(req) = requests.next().await {
        let (reply, end) = eliza::reply(req?.sentence);
        responses.send(ConverseResponse { sentence: reply, ..Default::default() }).await?;
        if end { break; }
    }
    Ok(())
}

ServerStreamWriter<T> is a thin wrapper over a bounded mpsc::Sender:

  • async fn send(&self, item: T) -> Result<(), ConnectError> — back-pressure on a full channel; error if the client has gone away.
  • async fn send_err(&self, err: ConnectError) -> Result<(), ConnectError> — push a stream-terminal error.
  • fn send_headers(&self, headers: HeaderMap) -> Result<(), ConnectError> — flush response headers before the first item; no-op (or error) after.

The dispatcher spawns the handler future and drains the channel into HTTP/2 flow control, the same way it polls the returned Stream today.

Open design questions

  • Whether the writer signature replaces, complements, or is an opt-in alternative to the current -> ServiceResult<ServiceStream<T>> form. The Stream-returning form is genuinely better for transformation handlers (stream::iter(...).map(...)) and pure server-streaming. Coexistence (two trait methods with one delegating to the other, or a codegen streaming_style= option) may be the right answer, but doubling the trait surface has costs.
  • Whether the same writer applies to plain server-streaming handlers, or only bidi.
  • Channel bound default and configurability.
  • How the writer interacts with the request OwnedView lifetime when the handler holds both across .await.

Scope

  • ServerStreamWriter<T> type + dispatcher plumbing
  • connectrpc-codegen changes to emit the alternative signature
  • examples + guide updates

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions