Skip to content

Session should be usable as a futures::Stream #4574

@dbolduc

Description

@dbolduc

Most of our types like the Paginator, Poller, storage::ReadObjectResponse are compatible with futures::Stream. e.g.

pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {

We offer an unstable-stream feature on these crates, and a conversion function to change them into something that impl futures::Stream.

(This stuff is feature gated because the futures crate is not generally available (GA), as its version is 0.x.y. We cannot have unstable types in our stable public API surface.)

We should do the same thing for a pubsub::subscriber::Session.


As followup, we can simplify the examples using streaming primitives, like timing out a stream with StreamExt::take_until:

use futures::StreamExt;

let deadline = tokio::time::sleep_until(Duration::from_secs(10));
let stream = session.into_stream().take_until(deadline);
while let Some(m, h) = stream.next().transpose()? { ... }

https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take_until

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.type: feature request‘Nice-to-have’ improvement, new feature or different behavior or design.

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions