-
-
Notifications
You must be signed in to change notification settings - Fork 28
Implement Stream for mpsc::Receiver. Add PollSender. #74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@hawkw I'm stuck with a lifetime issue. Think this can be only solved by introducing kind of an Thanks! |
2df0ec3 to
7ef5eb2
Compare
hawkw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, sorry I didn't see this PR sooner! I'd love to merge something like this. I had a suggestion regarding the implementation.
Regarding:
@hawkw I'm stuck with a lifetime issue. Think this can be only solved by introducing kind of an
OwnedSendRefFuturethat keeps a copy ofSender<..>but this is a bigger change I'd > like to get your opinion on that first.
I'd be open to that approach. Since Senders are either internally reference counted or are internally &'static references to a static array, cloning them is inexpensive, so it seems fine to add an owned variant. Would you mind doing that in a separate PR that this PR can be rebased onto? Thanks!
I also left some comments on the proposed API. Let me know what you think of them. Thank you!
| let send_ref = Box::pin(SendRefFuture { | ||
| core: &sender.inner.core, | ||
| slots: sender.inner.slots.as_ref(), | ||
| recycle: &sender.inner.recycle, | ||
| state: State::Start, | ||
| waiter: queue::Waiter::new(), | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, it's unfortunate that every time a poll_sender sends a new message, we'll allocate a new Box which is dropped when that message is sent. it would be preferable if there was a single heap allocation that lives for the entire lifetime of the sender, instead.
tokio_util's PollSender uses a ReusableBoxFuture type for this. I would be open to adding an optional tokio-util dependency that's only enabled when the pollable sender implementation is used, if it allowed us to avoid the allocation that occurs on every poll_reserve call...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Sure! I saw ReusableBoxFuture but have been afraid of adding an external dependent.
| enum PollSenderState<'a, T, R> { | ||
| Idle(Sender<T, R>), | ||
| Acquiring(Pin<Box<SendRefFuture<'a ,T, R>>>), | ||
| ReadyToSend(SendRef<'a, T>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels a little strange to me that the poll_reserve method stores the SendRef inside the PollSender and then consumes it when send is called, rather than just having poll_reserve return a SendRef to the user that can then be used to write a value. this API would allow the user to write to the slot in place through the SendRef, which isn't currently possible with the current API.
OTTOH, we would still need to store the SendRef internally in order to make the Sink impl work, but I think we could change the poll_reserve function to return a SendRef, and only store that SendRef internally when Sink::poll_ready is called? Of course, in order for that to work, we would need to add an owned SendRef variant, as you mentioned.
This would make the API somewhat different from tokio_util::sync's PollSender, but I think it's worth it to allow the user to actually write to a slot in place through a SendRef. If we wanted to present an identical API to tokio_util::sync::PollSender, we could also add a new poll method that returns a SendRef, and have poll_reserve just call that method and then store the returned SendRef if it returns Poll::Ready...what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I used the PollSender approach in order to not invent something new for something in place - but yes - the ability to directly return the SendRef is plausible. Having a different implementation than PollSender is not a problem for me personally because I assume that in the "normal" cases the Stream/Sink API is used.
If anyone calls poll_reserve, they normally know that they do and a difference to PollSender doesn't matter at all.
|
FWIW, there is already an open issue for adding owned variants to |
Implement `futures::future::Stream` for `mpsc::Receiver`. Add `mpsc::PollSender` that wrapps a mpsc::Sender and implements `Sink`.
Thanks for you reply. I totally agree that a separate PR with the owned send refs makes sense. I'd pause this PR until that is done. |
Implement
futures::future::Streamformpsc::Receiver. Addmpsc::PollSenderthat wrapps a mpsc::Sender and implementsSink. ThePollSenderimplementation is borrowed from tokio-util::sync::PollSender and slightly adopted.There's an lifetime issue, I'm currently unsure how to solve because
SendRefFutureneeds'sender, so the PR is marked as draft.Fixes #19